def _has_commit(version, debug=False):
"""
Determine a version is a local git commit sha or not.
:param version: A string containing the branch/tag/sha to be determined.
:param debug: An optional bool to toggle debug output.
:return: bool
"""
if _has_tag(version, debug) or _has_branch(version, debug):
return False
cmd = sh.git.bake('cat-file', '-e', version)
try:
util.run_command(cmd, debug=debug)
return True
except sh.ErrorReturnCode:
return False
python类ErrorReturnCode()的实例源码
def delete_path(self, path_or_fn, cascade=True):
"""Simple kubectl delete wrapper.
:param path_or_fn: Path or filename of yaml resource descriptions
"""
LOG.info('(-) kubectl delete -f %s', path_or_fn)
try:
self.kubectl.delete(
'-f', path_or_fn,
'--namespace={}'.format(self.config.namespace),
'--context={}'.format(self.config.context),
'--cascade={}'.format('true' if cascade else 'false')
)
except sh.ErrorReturnCode:
return False
return True
def update():
for organization in get_organization_list():
if not os.path.exists(os.path.join(get_env_path(organization),
'.git')):
info("Skip %s" % organization)
continue
info("Updating %s ..." % organization)
os.chdir(get_env_path(organization))
if not git('remote').strip():
info("No remotes are set for this organization, skipping")
continue
try:
vgit('pull')
except ErrorReturnCode:
fatal("Unable to update the organizations.")
run_galaxy_install(organization)
success("All the organizations have been updated.")
def _create_archive(self, archive_path, export_dir):
"""
Create the final archive of all the exported project files.
Args:
archive_path: Path to the ``tar.xz`` archive.
export_dir: Path to the directory containing the files to export.
"""
try:
logger.info('Creating archive %s', archive_path)
create_archive = tar.bake(create=True, xz=True, verbose=True,
file=archive_path, directory=export_dir,
_fg=True, _out=sys.stdout,
_err=sys.stderr)
create_archive(self._project_name)
except ErrorReturnCode as e:
raise CommandError('Failed to archive project - %s' % e)
def _gen_html(self, lcov_info_path):
"""
Generate an LCOV HTML report.
Returns the directory containing the HTML report.
"""
from sh import genhtml, ErrorReturnCode
lcov_html_dir = self.project_path('s2e-last', 'lcov')
try:
genhtml(lcov_info_path, output_directory=lcov_html_dir,
_out=sys.stdout, _err=sys.stderr, _fg=True)
except ErrorReturnCode as e:
raise CommandError(e)
return lcov_html_dir
def _update_s2e_sources(self):
"""
Update all of the S2E repositories with repo.
"""
repo = sh.Command(self.install_path('bin', 'repo'))
# cd into the S2E source directory
orig_dir = os.getcwd()
os.chdir(self.source_path('s2e'))
try:
logger.info('Updating S2E')
repo.sync(_out=sys.stdout, _err=sys.stderr, _fg=True)
except ErrorReturnCode as e:
raise CommandError(e)
finally:
# Change back to the original directory
os.chdir(orig_dir)
# Success!
logger.success('Updated S2E')
def test_readme(cookies):
"""The generated README.rst file should pass some sanity checks and validate as a PyPI long description."""
extra_context = {'repo_name': 'helloworld'}
with bake_in_temp_dir(cookies, extra_context=extra_context) as result:
readme_file = result.project.join('README.rst')
readme_lines = [x.strip() for x in readme_file.readlines(cr=False)]
assert 'helloworld' in readme_lines
assert 'The full documentation is at https://helloworld.readthedocs.org.' in readme_lines
setup_path = str(result.project.join('setup.py'))
try:
sh.python(setup_path, 'check', restructuredtext=True, strict=True)
except sh.ErrorReturnCode as exc:
pytest.fail(str(exc))
def check_quality(result):
"""Run quality tests on the given generated output."""
for dirpath, _dirnames, filenames in os.walk(str(result.project)):
pylintrc = str(result.project.join('pylintrc'))
for filename in filenames:
name = os.path.join(dirpath, filename)
if not name.endswith('.py'):
continue
try:
sh.pylint(name, rcfile=pylintrc)
sh.pylint(name, py3k=True)
sh.pycodestyle(name)
if filename != 'setup.py':
sh.pydocstyle(name)
sh.isort(name, check_only=True)
except sh.ErrorReturnCode as exc:
pytest.fail(str(exc))
tox_ini = result.project.join('tox.ini')
docs_build_dir = result.project.join('docs/_build')
try:
# Sanity check the generated Makefile
sh.make('help')
# quality check docs
sh.doc8(result.project.join("README.rst"), ignore_path=docs_build_dir, config=tox_ini)
sh.doc8(result.project.join("docs"), ignore_path=docs_build_dir, config=tox_ini)
except sh.ErrorReturnCode as exc:
pytest.fail(str(exc))
def test_custom_yaml():
from sh import ErrorReturnCode, chmod, ldap2pg, rm
LDAP2PG_CONFIG = 'my-test-ldap2pg.yml'
rm('-f', LDAP2PG_CONFIG)
with pytest.raises(ErrorReturnCode):
ldap2pg(_env=dict(os.environ, LDAP2PG_CONFIG=LDAP2PG_CONFIG))
yaml = YAML_FMT % os.environ
with open(LDAP2PG_CONFIG, 'w') as fo:
fo.write(yaml)
# Purge env from value set in file. Other are reads from ldaprc.
blacklist = ('LDAPURI', 'LDAPHOST', 'LDAPPORT', 'LDAPPASSWORD')
ldapfree_env = dict(
(k, v)
for k, v in os.environ.items()
if k not in blacklist
)
# Ensure world readable password is denied
with pytest.raises(ErrorReturnCode):
ldap2pg(config=LDAP2PG_CONFIG, _env=ldapfree_env)
# And that fixing file mode do the trick.
chmod('0600', LDAP2PG_CONFIG)
ldap2pg('--config', LDAP2PG_CONFIG, _env=ldapfree_env)
def _has_tag(version, debug=False):
"""
Determine a version is a local git tag name or not.
:param version: A string containing the branch/tag/sha to be determined.
:param debug: An optional bool to toggle debug output.
:return: bool
"""
cmd = sh.git.bake('show-ref', '--verify', '--quiet',
"refs/tags/{}".format(version))
try:
util.run_command(cmd, debug=debug)
return True
except sh.ErrorReturnCode:
return False
def _has_branch(version, debug=False):
"""
Determine a version is a local git branch name or not.
:param version: A string containing the branch/tag/sha to be determined.
:param debug: An optional bool to toggle debug output.
:return: bool
"""
cmd = sh.git.bake('show-ref', '--verify', '--quiet',
"refs/heads/{}".format(version))
try:
util.run_command(cmd, debug=debug)
return True
except sh.ErrorReturnCode:
return False
test_cookiecutter_generation.py 文件源码
项目:cookiecutter-django-reactjs
作者: genomics-geek
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def test_flake8_compliance(cookies):
"""generated project should pass flake8"""
result = cookies.bake()
try:
sh.flake8(str(result.project))
except sh.ErrorReturnCode as e:
pytest.fail(e)
def _wait_running_commands(self):
for search_index, running_command in self.running_commands:
training_label = self.training_label(search_index)
logging('Waiting {} to finish..'.format(training_label))
try:
running_command.wait()
except sh.ErrorReturnCode as e:
logging('{} returned a non-zero code!'.format(training_label))
except:
traceback.print_exc(file=sys.stderr)
def _find_java_home(self):
if self._java_home:
return self._java_home
# First check if the enviornment variable is set.
java_home = os.environ.get("JAVA_HOME", None)
if java_home:
return java_home
# On OS X, there's a magical command that gives you $JAVA_HOME
if sys.platform == "darwin":
try:
cmd = sh.Command("/usr/libexec/java_home")
return cmd().strip()
except sh.ErrorReturnCode:
pass
# If only one Java is installed in the default Linux JVM folder, use
# that
if sys.platform in {"linux", "linux2"}:
if os.path.isdir(self.DEFAULT_LINUX_JVM):
javas = os.listdir(self.DEFAULT_LINUX_JVM)
if len(javas) == 1:
return javas[0]
# Give up
return None
def describe(self, name):
"""Return a yaml-ish text blob.
Not helpful for automation, very helpful for humans.
"""
try:
return self.kubectl.describe(
self.url_type,
name,
'--context={}'.format(self.config.context),
'--namespace={}'.format(self.config.namespace)
)
except sh.ErrorReturnCode as err:
logging.error("Unexpected response: %s", err)
def delete(self, name):
"""Delete the named resource.
TODO: should be easy to rewrite this as a kube api
delete call instead of going through kubectl.
"""
try:
self.kubectl.delete(
self.url_type,
name,
'--context={}'.format(self.config.context),
'--namespace={}'.format(self.config.namespace)
)
except sh.ErrorReturnCode as err:
logging.error("Unexpected response: %s", err)
def pip_install(ctx, *specifiers):
# type: (click.Context, str) -> None
try:
result = sh.pip.install(*specifiers, _err_to_out=True, _iter=True)
for line in result:
click.echo(line, nl=False)
except sh.ErrorReturnCode:
ctx.abort()
def test_flake8_compliance(cookies):
"""generated project should pass flake8"""
result = cookies.bake()
#try:
# sh.flake8(str(result.project))
#except sh.ErrorReturnCode as e:
# print(e)
# pytest.fail(str(e))
def _get_code(self, nmpi_job, job_desc):
"""
Obtain the code and place it in the working directory.
If the experiment description is the URL of a Git repository, try to clone it.
If it is the URL of a zip or .tar.gz archive, download and unpack it.
Otherwise, the content of "code" is the code: write it to a file.
"""
url_candidate = urlparse(nmpi_job['code'])
logger.debug("Get code: %s %s", url_candidate.netloc, url_candidate.path)
if url_candidate.scheme and url_candidate.path.endswith((".tar.gz", ".zip", ".tgz")):
self._create_working_directory(job_desc.working_directory)
target = os.path.join(job_desc.working_directory, os.path.basename(url_candidate.path))
#urlretrieve(nmpi_job['code'], target) # not working via KIP https proxy
curl(nmpi_job['code'], '-o', target)
logger.info("Retrieved file from {} to local target {}".format(nmpi_job['code'], target))
if url_candidate.path.endswith((".tar.gz", ".tgz")):
tar("xzf", target, directory=job_desc.working_directory)
elif url_candidate.path.endswith(".zip"):
try:
# -o for auto-overwrite
unzip('-o', target, d=job_desc.working_directory)
except:
logger.error("Could not unzip file {}".format(target))
else:
try:
# Check the "code" field for a git url (clone it into the workdir) or a script (create a file into the workdir)
# URL: use git clone
git.clone('--recursive', nmpi_job['code'], job_desc.working_directory)
logger.info("Cloned repository {}".format(nmpi_job['code']))
except (sh.ErrorReturnCode_128, sh.ErrorReturnCode):
# SCRIPT: create file (in the current directory)
logger.info("The code field appears to be a script.")
self._create_working_directory(job_desc.working_directory)
with codecs.open(job_desc.arguments[0], 'w', encoding='utf8') as job_main_script:
job_main_script.write(nmpi_job['code'])
def run(self, resume=1):
"""Execute ansible-playbook using information gathered from config.
Args:
resume (int): Used as list index - 1 from which to resume workflow.
"""
# list index to start working on (for support of --resume)
try:
i = int(resume) - 1
except ValueError: # generally if passed a non-int
i = 0
cmds = self._config.playbook_cmds
kwargs = {
'_out': self._print_stdout,
'_err': self._print_stderr,
'_env': self._config.env
}
for counter, cmd in enumerate(cmds):
# skip execution until we reach our --resume index
# using a list slice doesn't work here since we need to be aware of
# the full list to produce a resume index on failure
if counter < i:
continue
try:
sh.ansible_playbook(*cmd, **kwargs)
except (sh.ErrorReturnCode, sh.ErrorReturnCode_1):
msg = ('An error was encountered during playbook execution. '
'Please resolve manually and then use the following '
'command to resume execution of this script:\n\n')
cmd = self._construct_resume_cli(counter + 1)
print(colorama.Fore.RED + msg + cmd)
sys.exit(1)
def check_project_result(result):
"""
Method to common project baking verification
"""
assert result.exit_code == 0
assert result.exception is None
assert result.project.isdir()
# Check project with flake8
try:
sh.flake8(str(result.project))
except sh.ErrorReturnCode as e:
pytest.fail(e)
def update():
for inventory in os.listdir(inventory_path):
if not os.path.exists(os.path.join(inventory_path, inventory, '.git')):
info("Skip %s" % inventory)
continue
info("Updating %s ..." % inventory)
os.chdir(os.path.join(inventory_path, inventory))
try:
vgit('pull')
except ErrorReturnCode:
fatal("Unable to update the inventories.")
success("All the inventories have been updated.")
def update_inventory(name, dest_path):
if not os.path.exists(os.path.join(dest_path, '.git')):
warning("The %s inventory is not a git repository and has not "
"been updated." % name)
return
click.echo('We will update the %s inventory' % name)
os.chdir(dest_path)
try:
vgit('pull')
except ErrorReturnCode:
fatal("Unable to update the inventory %s." % name)
success("The %s inventory has been updated." % name)
def install(name, path):
"""
Install inventories.
"""
if not name:
update()
return
if not name.isalnum():
fatal("Your inventory name should only contains alphanumeric "
"characters.")
dest_path = os.path.join(inventory_path, name)
if os.path.exists(dest_path):
update_inventory(name, dest_path)
return
if not path:
fatal("You must specify a path to a local directory or an URL to a "
"git repository to install a new inventory.")
if os.path.exists(path) and os.path.isdir(path):
if not os.path.exists(os.path.dirname(dest_path)):
os.mkdir(os.path.dirname(dest_path))
os.symlink(path, dest_path)
else:
click.echo('We will clone %s in %s\n' % (path, dest_path))
try:
vgit('clone', path, dest_path)
except ErrorReturnCode:
fatal("Unable to install the inventory %s." % name)
success("The %s inventory has been installed." % name)
def update_organization(name, dest_path):
click.echo('We will update the %s organization' % name)
os.chdir(dest_path)
if os.path.exists(os.path.join(dest_path, '.git')):
try:
vgit('pull')
except ErrorReturnCode:
fatal("Unable to update the organization %s." % name)
success("The %s organization has been updated." % name)
run_galaxy_install(name)
def _get_git_root():
"""
Retrieve the git directory, or prompt to create one if not found
"""
git_root = None
try:
git_root = str(git('rev-parse', '--show-toplevel')).strip()
except ErrorReturnCode as e:
if e.exit_code != 128:
fatal(e.message, e.exit_code)
if not git_root:
warning('You must be in a git repository directory to '
'initialize a new project.')
if not click.confirm('Do you want to create a new git '
'repository here?', default=True):
fatal('Please run %s' % style('git init', bold=True))
try:
vgit('init')
git_root = os.getcwd()
except ErrorReturnCode as e:
fatal('An error occurred when trying to initialize a '
'new repo.', e.exit_code)
if git_root == aeriscloud_path:
fatal('You cannot init AerisCloud from the AerisCloud directory!')
return git_root
def _run_nosetests():
click.echo('Running unit tests ... ', nl=False)
nose_bin = os.path.join(aeriscloud_path, 'venv/bin/nosetests')
errors = 0
try:
python(nose_bin, '-v', '--with-id', module_path(),
_err_to_out=True, _ok_code=[0])
click.echo('[%s]' % click.style('OK', fg='green'))
except ErrorReturnCode as e:
click.echo('[%s]\n' % click.style('FAIL', fg='red'))
for line in e.stdout.split('\n')[:-2]:
if line.startswith('#'):
print(line)
(id, name, test_file, ellipsis, res) = line.rstrip().split(' ')
if res == 'ok':
res = click.style(res, fg='green', bold=True)
elif res == 'FAIL':
res = click.style(res, fg='red', bold=True)
line = ' '.join([
click.style(id, bold=True, fg='yellow'),
click.style(name, fg='blue'),
test_file,
ellipsis,
res
])
elif line.startswith('FAIL:'):
(_, name, test_file) = line.split(' ')
line = ' '.join([
click.style('FAIL', bold=True, fg='red') + ':',
click.style(name, fg='blue'),
test_file
])
click.echo(' ' + line)
errors += 1
return errors
def _retry(func, *args, **kwargs):
for _ in range(300):
try:
func(*args, **kwargs)
break
except sh.ErrorReturnCode:
sleep(0.1)
else:
raise
def install_docker(version=None, container_id=None):
container_id = container_id or work.last_container_id
try:
quiet_docker('exec', container_id, *'which docker'.split(' '))
logger.info('Docker already installed on container. Doing nothing')
return
except sh.ErrorReturnCode:
pass
cp(resources.DIR / 'docker.repo', ':/etc/yum.repos.d/docker.repo',
container_id=container_id)
version = version or _get_docker_version()
install_docker_command = 'yum install -y -q docker-engine-{}'.format(
version)
docker('exec', container_id, *install_docker_command.split(' '))
def _get_docker_version():
try:
version = quiet_docker.version('-f', '{{.Client.Version}}').strip()
except sh.ErrorReturnCode as e:
version = e.stdout.strip()
# Replacing the -ce in the version with .ce, as the versions in
# https://yum.dockerproject.org/repo/main/centos/7/Packages/
# adhere to this notation
if version.endswith('-ce'):
version = version.replace('-ce', '.ce')
return version