def _ssh_setup(container_id, container_ip):
logger.info('Applying ssh configuration to manager container')
try:
known_hosts = path('~/.ssh/known_hosts').expanduser()
# Known hosts file may not exist
ssh_keygen('-R', container_ip)
fingerprint = None
while not fingerprint:
fingerprint = ssh_keyscan(
container_ip).stdout.split('\n')[0].strip()
time.sleep(0.01)
if fingerprint and known_hosts.exists():
current = known_hosts.text()
prefix = ''
if not current.endswith('\n'):
prefix = '\n'
known_hosts.write_text(
'{}{}\n'.format(prefix, fingerprint), append=True)
except sh.ErrorReturnCode:
pass
quiet_docker('exec', container_id, 'mkdir', '-p', '/root/.ssh')
ssh_public_key = ssh_keygen('-y', '-f', configuration.ssh_key_path).strip()
with tempfile.NamedTemporaryFile() as f:
f.write(ssh_public_key)
f.flush()
quiet_docker.cp(f.name, '{}:/root/.ssh/authorized_keys'.format(
container_id))
# due to a bug in docker 17.06, the file keeps ownership and is not
# chowned to the main container user automatically
quiet_docker('exec', container_id, 'chown', 'root:root',
'/root/.ssh/authorized_keys')
python类ErrorReturnCode()的实例源码
def _get_qstat_job_state(self):
try:
self.logger.debug('getting qstat infos')
ssh_output = self.ssh_host(self.cfg.path_qstat, self.job_id)
self.logger.debug('qstat output:\n{}'.format(ssh_output))
if ssh_output != '':
# slice the header and the last line which is empty
jobs_displayed = ssh_output.split('\n')[2:-1]
for job in jobs_displayed:
# remove whitespace
job_info = ' '.join(job.split())
# split into stuff
(self.job_id,
job_name,
job_user,
job_time,
job_status,
job_queue) = job_info.split(' ')
self.logger.debug(
'job {} has status {}'.format(self.job_id, job_status))
return job_status
else:
return None
except ErrorReturnCode as e:
self.logger.error('\nError in ssh call:\n{}'.format(e))
print e.stderr
exit(1)
def main(argv=None):
argv = argv or sys.argv[1:]
if {'-h', '-help', '--help'}.intersection(argv):
sh.ffmpeg(help=True, _fg=True)
return 0
notifier = ProgressNotifier()
try:
sh.ffmpeg(
sys.argv[1:],
_in=queue.Queue(),
_err=notifier,
_out_bufsize=0,
_err_bufsize=0,
#_in_bufsize=0,
_no_out=True,
_no_pipe=True,
_tty_in=True,
#_fg=True,
#_bg=True,
)
except sh.ErrorReturnCode as err:
print(notifier.lines[-1])
return err.exit_code
else:
print()
return 0
def run(self):
''' run command '''
if self.roles is not None:
print("Roles:\n{0}".format(yaml.dump(self.roles, default_flow_style=False)))
if self.excludes is not None:
print("Excludes:\n{0}".format(yaml.dump(self.excludes, default_flow_style=False)))
starting_dir = '.'
if self.roles is not None:
starting_dir = 'roles'
molecule_dirs = find_dirs(starting_dir, self.excludes, self.roles, 'molecule')
print("Found:\n{0}".format(yaml.dump(molecule_dirs, default_flow_style=False)))
base_dir = os.getcwd()
errors = ""
warnings = ""
for role in molecule_dirs:
role = os.path.dirname(role)
print("Testing: {0}".format(role))
os.chdir(role)
try:
print(sh.molecule.test())
except ErrorReturnCode as e:
print(e.stdout)
errors += e.stdout
os.chdir(base_dir)
if len(warnings) > 0:
print("Warnings:\n{0}\n".format(warnings))
if len(errors) > 0:
print("Errors:\n{0}\n".format(errors))
sys.exit(1)
def run(self):
''' run command '''
if self.roles is not None:
print("Roles:\n{0}".format(yaml.dump(self.roles, default_flow_style=False)))
if self.excludes is not None:
print("Excludes:\n{0}".format(yaml.dump(self.excludes, default_flow_style=False)))
starting_dir = '.'
if self.roles is not None:
starting_dir = 'roles'
molecule_dirs = find_dirs(starting_dir, self.excludes, self.roles, 'molecule')
print("Found:\n{0}".format(yaml.dump(molecule_dirs, default_flow_style=False)))
base_dir = os.getcwd()
errors = ""
warnings = ""
for role in molecule_dirs:
role = os.path.dirname(role)
print("Testing: {0}".format(role))
os.chdir(role)
try:
print(sh.molecule.test())
except ErrorReturnCode as e:
print(e.stdout)
errors += e.stdout
os.chdir(base_dir)
if len(warnings) > 0:
print("Warnings:\n{0}\n".format(warnings))
if len(errors) > 0:
print("Errors:\n{0}\n".format(errors))
sys.exit(1)
test_cookiecutter_generation.py 文件源码
项目:cookiecutter-django-gulp
作者: valerymelou
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def test_flake8_compliance(cookies):
"""generated project should pass flake8"""
result = cookies.bake()
try:
sh.flake8(str(result.project))
except sh.ErrorReturnCode as e:
pytest.fail(e)
def git_clone(git_repo_url, git_repo_dir):
try:
logger.info('Fetching from %s to %s', git_repo_url, git_repo_dir)
git.clone(git_repo_url, git_repo_dir, _out=sys.stdout,
_err=sys.stderr, _fg=True)
except ErrorReturnCode as e:
raise CommandError(e)
def _decompress(path):
"""
Decompress a .tar.xz file at the given path.
The decompressed data will be located in the same directory as ``path``.
"""
logger.info('Decompressing %s', path)
try:
tar(extract=True, xz=True, verbose=True, file=path,
directory=os.path.dirname(path), _fg=True, _out=sys.stdout,
_err=sys.stderr)
except ErrorReturnCode as e:
raise CommandError(e)
def _install_dependencies():
"""
Install S2E's dependencies.
Only apt-get is supported for now.
"""
logger.info('Installing S2E dependencies')
ubuntu_ver = _get_ubuntu_version()
if not ubuntu_ver:
return
install_packages = CONSTANTS['dependencies']['common'] + \
CONSTANTS['dependencies']['ubuntu_%d' % ubuntu_ver] + \
CONSTANTS['dependencies']['ida']
try:
# Enable 32-bit libraries
dpkg_add_arch = sudo.bake('dpkg', add_architecture=True, _fg=True)
dpkg_add_arch('i386')
# Perform apt-get install
apt_get = sudo.bake('apt-get', _fg=True)
apt_get.update()
apt_get.install(install_packages)
except ErrorReturnCode as e:
raise CommandError(e)
def _get_s2e_sources(env_path):
"""
Download the S2E manifest repository and initialize all of the S2E
repositories with repo.
"""
# Download repo
repo = _get_repo(env_path)
s2e_source_path = os.path.join(env_path, 'source', 's2e')
# Create the S2E source directory and cd to it to run repo
os.mkdir(s2e_source_path)
orig_dir = os.getcwd()
os.chdir(s2e_source_path)
git_url = CONSTANTS['repos']['url']
git_s2e_repo = CONSTANTS['repos']['s2e']
try:
# Now use repo to initialize all the repositories
logger.info('Fetching %s from %s', git_s2e_repo, git_url)
repo.init(u='%s/%s' % (git_url, git_s2e_repo), _out=sys.stdout,
_err=sys.stderr, _fg=True)
repo.sync(_out=sys.stdout, _err=sys.stderr, _fg=True)
except ErrorReturnCode as e:
# Clean up - remove the half-created S2E environment
shutil.rmtree(env_path)
raise CommandError(e)
finally:
# Change back to the original directory
os.chdir(orig_dir)
# Success!
logger.success('Fetched %s', git_s2e_repo)
def _get_project_name(archive):
"""
Get the project name from the archive.
The project name is the name of the root directory in the archive.
"""
try:
contents = tar(exclude='*/*', list=True, file=archive)
return os.path.dirname(str(contents))
except ErrorReturnCode as e:
raise CommandError('Failed to list archive - %s' % e)
def handle(self, *args, **options):
# Exit if the makefile doesn't exist
makefile = self.env_path('source', 's2e', 'Makefile')
if not os.path.isfile(makefile):
raise CommandError('No makefile found in %s' %
os.path.dirname(makefile))
# If the build directory doesn't exist, create it
build_dir = self.env_path('build', 's2e')
if not os.path.isdir(build_dir):
os.mkdir(build_dir)
# Set up some environment variables
env_vars = os.environ.copy()
env_vars['S2EPREFIX'] = self.install_path()
components = options['components']
self._make = sh.Command('make').bake(directory=build_dir, file=makefile, _env=env_vars)
# If the user has specified any components to rebuild, do this before
# the build
if components:
self._rebuild_components(components)
try:
# Run make
if options['debug']:
logger.info('Building S2E (debug) in %s', build_dir)
self._make('all-debug', _out=sys.stdout, _err=sys.stderr, _fg=True)
else:
logger.info('Building S2E (release) in %s', build_dir)
self._make('install', _out=sys.stdout, _err=sys.stderr, _fg=True)
except ErrorReturnCode as e:
raise CommandError(e)
return 'S2E built'
def _invoke_make(self, img_build_dir, rule_names, num_cores, iso_dir=''):
env = os.environ.copy()
env['S2E_INSTALL_ROOT'] = self.install_path()
env['S2E_LINUX_KERNELS_ROOT'] = \
self.source_path(CONSTANTS['repos']['images']['linux'])
env['OUTDIR'] = self.image_path()
if iso_dir:
env['ISODIR'] = iso_dir
logger.debug('Invoking makefile with:')
logger.debug('export S2E_INSTALL_ROOT=%s', env['S2E_INSTALL_ROOT'])
logger.debug('export S2E_LINUX_KERNELS_ROOT=%s', env['S2E_LINUX_KERNELS_ROOT'])
logger.debug('export OUTDIR=%s', env['OUTDIR'])
logger.debug('export ISODIR=%s', env.get('ISODIR', ''))
if not self._headless:
env['GRAPHICS'] = ''
else:
logger.warn('Image creation will run in headless mode. '
'Use --gui to see graphic output for debugging.')
try:
make = sh.Command('make').bake(file=os.path.join(img_build_dir,
'Makefile'),
directory=self.image_path(),
_out=sys.stdout, _err=sys.stderr,
_env=env, _fg=True)
make_image = make.bake(j=num_cores)
make_image(rule_names)
except ErrorReturnCode as e:
raise CommandError(e)
def run_docker_dev_test(path, coverage=False):
"""
Method to check that docker runs with dev.yml
"""
try:
# build django, power up the stack and run the test
sh.docker_compose(
"--file", "{}/dev.yml".format(path), "build", "django"
)
sh.docker_compose("--file", "{}/dev.yml".format(path), "build")
if coverage:
sh.docker_compose(
"--file", "{}/dev.yml".format(path), "run", "django", "coverage", "run", "manage.py", "test"
)
sh.docker_compose(
"--file", "{}/dev.yml".format(path), "run", "django", "coverage", "xml", "-o", "coverage.xml"
)
shutil.copyfile(os.path.join(str(path), ".coverage"), os.path.join(PROJECT_DIR, ".coverage"))
shutil.copyfile(os.path.join(str(path), "coverage.xml"),
os.path.join(PROJECT_DIR, "coverage.xml"))
else:
sh.docker_compose(
"--file", "{}/dev.yml".format(path), "run", "django", "python", "manage.py", "test"
)
# test that the development server is running
sh.docker_compose("--file", "{}/dev.yml".format(path), "up", "-d")
time.sleep(10)
curl = sh.curl("-I", "http://localhost:8000/")
assert "200 OK" in curl
assert "Server: Werkzeug" in curl
# since we are running a lot of tests with different configurations,
# we need to clean up the environment. Stop all running containers,
# remove them and remove the postgres_data volume.
sh.docker_compose("--file", "{}/dev.yml".format(path), "stop")
sh.docker_compose("--file", "{}/dev.yml".format(path), "rm", "-f")
sh.docker("volume", "rm", "cookiecuttersaastestproject_postgres_data_dev")
except sh.ErrorReturnCode as e:
# in case there are errors it's good to have full output of
# stdout and stderr.
pytest.fail("STDOUT: {} \n\n\n STDERR: {}".format(
e.stdout.decode("utf-8"), e.stderr.decode("utf-8"))
)
def _run_ansible_lint(organization):
al_bin = os.path.join(aeriscloud_path, 'venv/bin/ansible-lint')
env = ansible_env(os.environ.copy())
if organization:
environment_files = glob.glob(get_env_path(organization) + '/*.yml')
else:
environment_files = glob.glob(organization_path + '/*/*.yml')
if not environment_files:
return 0
args = environment_files + ['-r', os.path.join(ansible_path, 'rules')]
click.echo('Running ansible-lint ... ', nl=False)
errors = 0
try:
python(al_bin, *args,
_env=env, _err_to_out=True, _ok_code=[0])
click.echo('[%s]' % click.style('OK', fg='green'))
except ErrorReturnCode as e:
parser = re.compile(
r'^\[(?P<error_code>[^\]]+)\] (?P<error_message>[^\n]+)\n'
r'%s(?P<file_name>[^:]+):(?P<line_number>[0-9]+)\n'
r'Task/Handler: (?P<task_name>[^\n]+)\n\n' % (ansible_path + '/'),
re.MULTILINE
)
click.echo('[%s]\n' % click.style('FAIL', fg='red'))
last_file = None
pos = 0
while pos < len(e.stdout):
match = parser.match(e.stdout, pos)
if not match:
click.secho("Error: %s" % e.stdout)
errors += 1
break
error = match.groupdict()
if error['file_name'] != last_file:
click.secho(' Errors in file: %s' % error['file_name'],
fg='blue', bold=True)
last_file = error['file_name']
click.echo(' line %s task %s: %s %s' % (
click.style(error['line_number'], fg='green'),
click.style(error['task_name'], fg='green'),
click.style(error['error_code'], fg='red'),
click.style(error['error_message'], fg='red'),
))
errors += 1
pos = match.end()
return errors
def submit_job(self):
"""Submit the job to qsub, returns job_id."""
self.logger.info('Submitting job ...')
job_script_path = self._get_job_script_path()
arg_list = self._build_qsub_args()
arg_list.append(job_script_path)
self.logger.debug('arg_liste: {}'.format(arg_list))
try:
self.logger.debug(
'{} {}'.format(self.cfg.path_qsub, arg_list)
)
# get stating time and convert
self.time_stamp_jobstart = int(time.time()) * 1000
self.logger.debug(
'stat time stamp: {}'.format(self.time_stamp_jobstart)
)
# Job submit
ssh_output = self.ssh_host(self.cfg.path_qsub, *arg_list)
# searching job id
for line in ssh_output:
self.logger.debug('searching for job id in \n{}'.format(line))
if "hlrs.de" in line:
self.logger.debug('possible job id found: {}'.format(line))
self.job_id = str(line)
if self.cfg.grafana:
self.logger.info(
'Job performance data at:\n'
'{}var-JobId=snapTask-{}-{}&'
'from={}&'
'to=now'.format(
self.cfg.grafana_base_string,
self.cfg.user_name,
self.job_id.rstrip(),
self.time_stamp_jobstart
)
)
return
self.logger.error(
'no job id found in \n{}\nexiting!!!'.format(ssh_output)
)
exit(1)
except ErrorReturnCode as e:
self.logger.error('\nError in ssh call:\n{}'.format(e))
print e.stderr
exit(1)
def _get_basic_blocks(self):
"""
Extract basic block information from the target binary using S2E's IDA
Pro script.
This extraction is done within a temporary directory so that we don't
pollute the file system with temporary idbs and other such things.
"""
logger.info('Generating basic block information from IDA Pro')
try:
with TemporaryDirectory() as temp_dir:
target_path = self._project_desc['target_path']
# Copy the binary to the temporary directory. Because projects
# are created with a symlink to the target program, then IDA
# Pro will generate the idb and bblist files in the symlinked
# target's directory. Which is not what we want
target_name = os.path.basename(target_path)
temp_target_path = os.path.join(temp_dir, target_name)
shutil.copyfile(target_path, temp_target_path)
# Run the IDA Pro extractBasicBlocks script
env_vars = os.environ.copy()
env_vars['TVHEADLESS'] = '1'
# This is required if s2e-env runs inside screen
env_vars['TERM'] = 'xterm'
ida = sh.Command(self._ida_path)
ida('-A', '-B',
'-S%s' % self.install_path('bin', 'extractBasicBlocks.py'),
temp_target_path, _out=os.devnull, _tty_out=False,
_cwd=temp_dir, _env=env_vars)
# Check that the basic block list file was correctly generated
bblist_file = os.path.join(temp_dir, '%s.bblist' % target_name)
if not os.path.isfile(bblist_file):
raise CommandError('Failed to generate bblist file for '
'%s' % target_name)
# Parse the basic block list file
#
# to_basic_block takes a 3-tuple read from the bblist file and
# converts it to a BasicBlock
to_basic_block = lambda tup: BasicBlock(int(tup[0], 16),
int(tup[1], 16),
tup[2])
with open(bblist_file, 'r') as f:
return [to_basic_block(l.rstrip().split(' ')) for l in f]
except ErrorReturnCode as e:
raise CommandError(e)