python类ErrorReturnCode()的实例源码

git.py 文件源码 项目:gilt 作者: metacloud 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _has_commit(version, debug=False):
    """
    Determine a version is a local git commit sha or not.

    :param version: A string containing the branch/tag/sha to be determined.
    :param debug: An optional bool to toggle debug output.
    :return: bool
    """
    if _has_tag(version, debug) or _has_branch(version, debug):
        return False
    cmd = sh.git.bake('cat-file', '-e', version)
    try:
        util.run_command(cmd, debug=debug)
        return True
    except sh.ErrorReturnCode:
        return False
kubelib.py 文件源码 项目:kubelib 作者: safarijv 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def delete_path(self, path_or_fn, cascade=True):
        """Simple kubectl delete wrapper.

        :param path_or_fn: Path or filename of yaml resource descriptions
        """
        LOG.info('(-) kubectl delete -f %s', path_or_fn)
        try:
            self.kubectl.delete(
                '-f', path_or_fn,
                '--namespace={}'.format(self.config.namespace),
                '--context={}'.format(self.config.context),
                '--cascade={}'.format('true' if cascade else 'false')
            )
        except sh.ErrorReturnCode:
            return False
        return True
organization.py 文件源码 项目:AerisCloud 作者: AerisCloud 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def update():
    for organization in get_organization_list():
        if not os.path.exists(os.path.join(get_env_path(organization),
                                           '.git')):
            info("Skip %s" % organization)
            continue

        info("Updating %s ..." % organization)
        os.chdir(get_env_path(organization))

        if not git('remote').strip():
            info("No remotes are set for this organization, skipping")
            continue

        try:
            vgit('pull')
        except ErrorReturnCode:
            fatal("Unable to update the organizations.")

        run_galaxy_install(organization)

    success("All the organizations have been updated.")
export_project.py 文件源码 项目:s2e-env 作者: S2E 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _create_archive(self, archive_path, export_dir):
        """
        Create the final archive of all the exported project files.

        Args:
            archive_path: Path to the ``tar.xz`` archive.
            export_dir: Path to the directory containing the files to export.
        """
        try:
            logger.info('Creating archive %s', archive_path)
            create_archive = tar.bake(create=True, xz=True, verbose=True,
                                      file=archive_path, directory=export_dir,
                                      _fg=True, _out=sys.stdout,
                                      _err=sys.stderr)
            create_archive(self._project_name)
        except ErrorReturnCode as e:
            raise CommandError('Failed to archive project - %s' % e)
lcov.py 文件源码 项目:s2e-env 作者: S2E 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _gen_html(self, lcov_info_path):
        """
        Generate an LCOV HTML report.

        Returns the directory containing the HTML report.
        """
        from sh import genhtml, ErrorReturnCode

        lcov_html_dir = self.project_path('s2e-last', 'lcov')
        try:
            genhtml(lcov_info_path, output_directory=lcov_html_dir,
                    _out=sys.stdout, _err=sys.stderr, _fg=True)
        except ErrorReturnCode as e:
            raise CommandError(e)

        return lcov_html_dir
update.py 文件源码 项目:s2e-env 作者: S2E 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _update_s2e_sources(self):
        """
        Update all of the S2E repositories with repo.
        """
        repo = sh.Command(self.install_path('bin', 'repo'))

        # cd into the S2E source directory
        orig_dir = os.getcwd()
        os.chdir(self.source_path('s2e'))

        try:
            logger.info('Updating S2E')
            repo.sync(_out=sys.stdout, _err=sys.stderr, _fg=True)
        except ErrorReturnCode as e:
            raise CommandError(e)
        finally:
            # Change back to the original directory
            os.chdir(orig_dir)

        # Success!
        logger.success('Updated S2E')
test_bake_project.py 文件源码 项目:cookiecutter-django-app 作者: edx 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_readme(cookies):
    """The generated README.rst file should pass some sanity checks and validate as a PyPI long description."""
    extra_context = {'repo_name': 'helloworld'}
    with bake_in_temp_dir(cookies, extra_context=extra_context) as result:

        readme_file = result.project.join('README.rst')
        readme_lines = [x.strip() for x in readme_file.readlines(cr=False)]
        assert 'helloworld' in readme_lines
        assert 'The full documentation is at https://helloworld.readthedocs.org.' in readme_lines
        setup_path = str(result.project.join('setup.py'))
        try:
            sh.python(setup_path, 'check', restructuredtext=True, strict=True)
        except sh.ErrorReturnCode as exc:
            pytest.fail(str(exc))
test_bake_project.py 文件源码 项目:cookiecutter-django-app 作者: edx 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def check_quality(result):
    """Run quality tests on the given generated output."""
    for dirpath, _dirnames, filenames in os.walk(str(result.project)):
        pylintrc = str(result.project.join('pylintrc'))
        for filename in filenames:
            name = os.path.join(dirpath, filename)
            if not name.endswith('.py'):
                continue
            try:
                sh.pylint(name, rcfile=pylintrc)
                sh.pylint(name, py3k=True)
                sh.pycodestyle(name)
                if filename != 'setup.py':
                    sh.pydocstyle(name)
                sh.isort(name, check_only=True)
            except sh.ErrorReturnCode as exc:
                pytest.fail(str(exc))

    tox_ini = result.project.join('tox.ini')
    docs_build_dir = result.project.join('docs/_build')
    try:
        # Sanity check the generated Makefile
        sh.make('help')
        # quality check docs
        sh.doc8(result.project.join("README.rst"), ignore_path=docs_build_dir, config=tox_ini)
        sh.doc8(result.project.join("docs"), ignore_path=docs_build_dir, config=tox_ini)
    except sh.ErrorReturnCode as exc:
        pytest.fail(str(exc))
test_config.py 文件源码 项目:ldap2pg 作者: dalibo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_custom_yaml():
    from sh import ErrorReturnCode, chmod, ldap2pg, rm

    LDAP2PG_CONFIG = 'my-test-ldap2pg.yml'
    rm('-f', LDAP2PG_CONFIG)
    with pytest.raises(ErrorReturnCode):
        ldap2pg(_env=dict(os.environ, LDAP2PG_CONFIG=LDAP2PG_CONFIG))

    yaml = YAML_FMT % os.environ
    with open(LDAP2PG_CONFIG, 'w') as fo:
        fo.write(yaml)

    # Purge env from value set in file. Other are reads from ldaprc.
    blacklist = ('LDAPURI', 'LDAPHOST', 'LDAPPORT', 'LDAPPASSWORD')
    ldapfree_env = dict(
        (k, v)
        for k, v in os.environ.items()
        if k not in blacklist
    )

    # Ensure world readable password is denied
    with pytest.raises(ErrorReturnCode):
        ldap2pg(config=LDAP2PG_CONFIG, _env=ldapfree_env)

    # And that fixing file mode do the trick.
    chmod('0600', LDAP2PG_CONFIG)
    ldap2pg('--config', LDAP2PG_CONFIG, _env=ldapfree_env)
git.py 文件源码 项目:gilt 作者: metacloud 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _has_tag(version, debug=False):
    """
    Determine a version is a local git tag name or not.

    :param version: A string containing the branch/tag/sha to be determined.
    :param debug: An optional bool to toggle debug output.
    :return: bool
    """
    cmd = sh.git.bake('show-ref', '--verify', '--quiet',
                      "refs/tags/{}".format(version))
    try:
        util.run_command(cmd, debug=debug)
        return True
    except sh.ErrorReturnCode:
        return False
git.py 文件源码 项目:gilt 作者: metacloud 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _has_branch(version, debug=False):
    """
    Determine a version is a local git branch name or not.

    :param version: A string containing the branch/tag/sha to be determined.
    :param debug: An optional bool to toggle debug output.
    :return: bool
    """
    cmd = sh.git.bake('show-ref', '--verify', '--quiet',
                      "refs/heads/{}".format(version))
    try:
        util.run_command(cmd, debug=debug)
        return True
    except sh.ErrorReturnCode:
        return False
test_cookiecutter_generation.py 文件源码 项目:cookiecutter-django-reactjs 作者: genomics-geek 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_flake8_compliance(cookies):
    """generated project should pass flake8"""
    result = cookies.bake()

    try:
        sh.flake8(str(result.project))
    except sh.ErrorReturnCode as e:
        pytest.fail(e)
hyperparam_search.py 文件源码 项目:keras-image-captioning 作者: danieljl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _wait_running_commands(self):
        for search_index, running_command in self.running_commands:
            training_label = self.training_label(search_index)
            logging('Waiting {} to finish..'.format(training_label))
            try:
                running_command.wait()
            except sh.ErrorReturnCode as e:
                logging('{} returned a non-zero code!'.format(training_label))
            except:
                traceback.print_exc(file=sys.stderr)
java.py 文件源码 项目:rootfetch 作者: zmap 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _find_java_home(self):
        if self._java_home:
            return self._java_home

        # First check if the enviornment variable is set.
        java_home = os.environ.get("JAVA_HOME", None)
        if java_home:
            return java_home

        # On OS X, there's a magical command that gives you $JAVA_HOME
        if sys.platform == "darwin":
            try:
                cmd = sh.Command("/usr/libexec/java_home")
                return cmd().strip()
            except sh.ErrorReturnCode:
                pass

        # If only one Java is installed in the default Linux JVM folder, use
        # that
        if sys.platform in {"linux", "linux2"}:
            if os.path.isdir(self.DEFAULT_LINUX_JVM):
                javas = os.listdir(self.DEFAULT_LINUX_JVM)
                if len(javas) == 1:
                    return javas[0]

        # Give up
        return None
kubelib.py 文件源码 项目:kubelib 作者: safarijv 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def describe(self, name):
        """Return a yaml-ish text blob.

        Not helpful for automation, very helpful for humans.
        """
        try:
            return self.kubectl.describe(
                self.url_type,
                name,
                '--context={}'.format(self.config.context),
                '--namespace={}'.format(self.config.namespace)
            )
        except sh.ErrorReturnCode as err:
            logging.error("Unexpected response: %s", err)
kubelib.py 文件源码 项目:kubelib 作者: safarijv 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def delete(self, name):
        """Delete the named resource.

        TODO: should be easy to rewrite this as a kube api
        delete call instead of going through kubectl.
        """
        try:
            self.kubectl.delete(
                self.url_type,
                name,
                '--context={}'.format(self.config.context),
                '--namespace={}'.format(self.config.namespace)
            )
        except sh.ErrorReturnCode as err:
            logging.error("Unexpected response: %s", err)
cli.py 文件源码 项目:reqwire 作者: darvid 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def pip_install(ctx, *specifiers):
    # type: (click.Context, str) -> None
    try:
        result = sh.pip.install(*specifiers, _err_to_out=True, _iter=True)
        for line in result:
            click.echo(line, nl=False)
    except sh.ErrorReturnCode:
        ctx.abort()
test_cookiecutter_generation.py 文件源码 项目:cookiecutter-saas 作者: jayfk 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_flake8_compliance(cookies):
    """generated project should pass flake8"""
    result = cookies.bake()

    #try:
    #    sh.flake8(str(result.project))
    #except sh.ErrorReturnCode as e:
    #    print(e)
    #    pytest.fail(str(e))
nmpi_saga.py 文件源码 项目:hbp-neuromorphic-client 作者: HumanBrainProject 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_code(self, nmpi_job, job_desc):
        """
        Obtain the code and place it in the working directory.

        If the experiment description is the URL of a Git repository, try to clone it.
        If it is the URL of a zip or .tar.gz archive, download and unpack it.
        Otherwise, the content of "code" is the code: write it to a file.
        """
        url_candidate = urlparse(nmpi_job['code'])
        logger.debug("Get code: %s %s", url_candidate.netloc, url_candidate.path)
        if url_candidate.scheme and url_candidate.path.endswith((".tar.gz", ".zip", ".tgz")):
            self._create_working_directory(job_desc.working_directory)
            target = os.path.join(job_desc.working_directory, os.path.basename(url_candidate.path))
            #urlretrieve(nmpi_job['code'], target) # not working via KIP https proxy
            curl(nmpi_job['code'], '-o', target)
            logger.info("Retrieved file from {} to local target {}".format(nmpi_job['code'], target))
            if url_candidate.path.endswith((".tar.gz", ".tgz")):
                tar("xzf", target, directory=job_desc.working_directory)
            elif url_candidate.path.endswith(".zip"):
                try:
                    # -o for auto-overwrite
                    unzip('-o', target, d=job_desc.working_directory)
                except:
                    logger.error("Could not unzip file {}".format(target))
        else:
            try:
                # Check the "code" field for a git url (clone it into the workdir) or a script (create a file into the workdir)
                # URL: use git clone
                git.clone('--recursive', nmpi_job['code'], job_desc.working_directory)
                logger.info("Cloned repository {}".format(nmpi_job['code']))
            except (sh.ErrorReturnCode_128, sh.ErrorReturnCode):
                # SCRIPT: create file (in the current directory)
                logger.info("The code field appears to be a script.")
                self._create_working_directory(job_desc.working_directory)
                with codecs.open(job_desc.arguments[0], 'w', encoding='utf8') as job_main_script:
                    job_main_script.write(nmpi_job['code'])
provisioner.py 文件源码 项目:photon 作者: metacloud 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run(self, resume=1):
        """Execute ansible-playbook using information gathered from config.

        Args:
            resume (int): Used as list index - 1 from which to resume workflow.
        """
        # list index to start working on (for support of --resume)
        try:
            i = int(resume) - 1
        except ValueError:  # generally if passed a non-int
            i = 0

        cmds = self._config.playbook_cmds
        kwargs = {
            '_out': self._print_stdout,
            '_err': self._print_stderr,
            '_env': self._config.env
        }

        for counter, cmd in enumerate(cmds):
            # skip execution until we reach our --resume index
            # using a list slice doesn't work here since we need to be aware of
            # the full list to produce a resume index on failure
            if counter < i:
                continue

            try:
                sh.ansible_playbook(*cmd, **kwargs)
            except (sh.ErrorReturnCode, sh.ErrorReturnCode_1):
                msg = ('An error was encountered during playbook execution. '
                       'Please resolve manually and then use the following '
                       'command to resume execution of this script:\n\n')
                cmd = self._construct_resume_cli(counter + 1)
                print(colorama.Fore.RED + msg + cmd)
                sys.exit(1)
test_cookiecutter.py 文件源码 项目:dvhb-backend-cookiecutter 作者: dvhbru 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def check_project_result(result):
    """
    Method to common project baking verification
    """
    assert result.exit_code == 0
    assert result.exception is None
    assert result.project.isdir()

    # Check project with flake8
    try:
        sh.flake8(str(result.project))
    except sh.ErrorReturnCode as e:
        pytest.fail(e)
inventory.py 文件源码 项目:AerisCloud 作者: AerisCloud 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def update():
    for inventory in os.listdir(inventory_path):
        if not os.path.exists(os.path.join(inventory_path, inventory, '.git')):
            info("Skip %s" % inventory)
            continue

        info("Updating %s ..." % inventory)
        os.chdir(os.path.join(inventory_path, inventory))
        try:
            vgit('pull')
        except ErrorReturnCode:
            fatal("Unable to update the inventories.")
    success("All the inventories have been updated.")
inventory.py 文件源码 项目:AerisCloud 作者: AerisCloud 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def update_inventory(name, dest_path):
    if not os.path.exists(os.path.join(dest_path, '.git')):
        warning("The %s inventory is not a git repository and has not "
                "been updated." % name)
        return

    click.echo('We will update the %s inventory' % name)
    os.chdir(dest_path)
    try:
        vgit('pull')
    except ErrorReturnCode:
        fatal("Unable to update the inventory %s." % name)
    success("The %s inventory has been updated." % name)
inventory.py 文件源码 项目:AerisCloud 作者: AerisCloud 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def install(name, path):
    """
    Install inventories.
    """
    if not name:
        update()
        return

    if not name.isalnum():
        fatal("Your inventory name should only contains alphanumeric "
              "characters.")

    dest_path = os.path.join(inventory_path, name)
    if os.path.exists(dest_path):
        update_inventory(name, dest_path)
        return

    if not path:
        fatal("You must specify a path to a local directory or an URL to a "
              "git repository to install a new inventory.")

    if os.path.exists(path) and os.path.isdir(path):
        if not os.path.exists(os.path.dirname(dest_path)):
            os.mkdir(os.path.dirname(dest_path))
        os.symlink(path, dest_path)
    else:
        click.echo('We will clone %s in %s\n' % (path, dest_path))
        try:
            vgit('clone', path, dest_path)
        except ErrorReturnCode:
            fatal("Unable to install the inventory %s." % name)
    success("The %s inventory has been installed." % name)
organization.py 文件源码 项目:AerisCloud 作者: AerisCloud 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def update_organization(name, dest_path):
    click.echo('We will update the %s organization' % name)
    os.chdir(dest_path)
    if os.path.exists(os.path.join(dest_path, '.git')):
        try:
            vgit('pull')
        except ErrorReturnCode:
            fatal("Unable to update the organization %s." % name)
        success("The %s organization has been updated." % name)

    run_galaxy_install(name)
init.py 文件源码 项目:AerisCloud 作者: AerisCloud 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _get_git_root():
    """
    Retrieve the git directory, or prompt to create one if not found
    """
    git_root = None

    try:
        git_root = str(git('rev-parse', '--show-toplevel')).strip()
    except ErrorReturnCode as e:
        if e.exit_code != 128:
            fatal(e.message, e.exit_code)

    if not git_root:
        warning('You must be in a git repository directory to '
                'initialize a new project.')

        if not click.confirm('Do you want to create a new git '
                             'repository here?', default=True):
            fatal('Please run %s' % style('git init', bold=True))

        try:
            vgit('init')
            git_root = os.getcwd()
        except ErrorReturnCode as e:
            fatal('An error occurred when trying to initialize a '
                  'new repo.', e.exit_code)

    if git_root == aeriscloud_path:
        fatal('You cannot init AerisCloud from the AerisCloud directory!')

    return git_root
test.py 文件源码 项目:AerisCloud 作者: AerisCloud 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _run_nosetests():
    click.echo('Running unit tests ... ', nl=False)
    nose_bin = os.path.join(aeriscloud_path, 'venv/bin/nosetests')

    errors = 0
    try:
        python(nose_bin, '-v', '--with-id', module_path(),
               _err_to_out=True, _ok_code=[0])
        click.echo('[%s]' % click.style('OK', fg='green'))
    except ErrorReturnCode as e:
        click.echo('[%s]\n' % click.style('FAIL', fg='red'))

        for line in e.stdout.split('\n')[:-2]:
            if line.startswith('#'):
                print(line)
                (id, name, test_file, ellipsis, res) = line.rstrip().split(' ')

                if res == 'ok':
                    res = click.style(res, fg='green', bold=True)
                elif res == 'FAIL':
                    res = click.style(res, fg='red', bold=True)

                line = ' '.join([
                    click.style(id, bold=True, fg='yellow'),
                    click.style(name, fg='blue'),
                    test_file,
                    ellipsis,
                    res
                ])
            elif line.startswith('FAIL:'):
                (_, name, test_file) = line.split(' ')

                line = ' '.join([
                    click.style('FAIL', bold=True, fg='red') + ':',
                    click.style(name, fg='blue'),
                    test_file
                ])

            click.echo('  ' + line)
            errors += 1
    return errors
commands.py 文件源码 项目:docl 作者: cloudify-cosmo 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _retry(func, *args, **kwargs):
    for _ in range(300):
        try:
            func(*args, **kwargs)
            break
        except sh.ErrorReturnCode:
            sleep(0.1)
    else:
        raise
commands.py 文件源码 项目:docl 作者: cloudify-cosmo 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def install_docker(version=None, container_id=None):
    container_id = container_id or work.last_container_id
    try:
        quiet_docker('exec', container_id, *'which docker'.split(' '))
        logger.info('Docker already installed on container. Doing nothing')
        return
    except sh.ErrorReturnCode:
        pass
    cp(resources.DIR / 'docker.repo', ':/etc/yum.repos.d/docker.repo',
       container_id=container_id)
    version = version or _get_docker_version()
    install_docker_command = 'yum install -y -q docker-engine-{}'.format(
        version)
    docker('exec', container_id, *install_docker_command.split(' '))
commands.py 文件源码 项目:docl 作者: cloudify-cosmo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _get_docker_version():
    try:
        version = quiet_docker.version('-f', '{{.Client.Version}}').strip()
    except sh.ErrorReturnCode as e:
        version = e.stdout.strip()

    # Replacing the -ce in the version with .ce, as the versions in
    # https://yum.dockerproject.org/repo/main/centos/7/Packages/
    # adhere to this notation
    if version.endswith('-ce'):
        version = version.replace('-ce', '.ce')
    return version


问题


面经


文章

微信
公众号

扫码关注公众号