python类CalledProcessError()的实例源码

kubectl_util.py 文件源码 项目:benchmarks 作者: tensorflow 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _PrintLogs(pod_name_prefix, job_name):
  """Prints pod logs.

  If a pod has been restarted, prints logs from previous run. Otherwise,
  prints the logs from current run. We print logs for pods selected
  based on pod_name_prefix and job_name.

  Args:
    pod_name_prefix: value of 'name-prefix' selector.
    job_name: value of 'job' selector.
  """
  for pod_name in _GetPodNames(pod_name_prefix, job_name):
    try:
      # Get previous logs.
      logs_command = [_KUBECTL, 'logs', '-p', pod_name]
      logging.info('Command to get logs: %s', ' '.join(logs_command))
      output = subprocess.check_output(logs_command, universal_newlines=True)
    except subprocess.CalledProcessError:
      # We couldn't get previous logs, so we will try to get current logs.
      logs_command = [_KUBECTL, 'logs', pod_name]
      logging.info('Command to get logs: %s', ' '.join(logs_command))
      output = subprocess.check_output(logs_command, universal_newlines=True)
    print('%s logs:' % pod_name)
    print(output)
create_extracts.py 文件源码 项目:extracts 作者: openmaptiles 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def process_extract(extract):

        extract_file = os.path.join(target_dir, extract.extract + '.mbtiles')
        print('Create extract {}'.format(extract_file))

        # Instead of patching copy over the patch source as target and
        # write directly to it (since that works concurrently).
        patch_src = args['--patch-from']
        if patch_src:
            print('Use patch from {} as base'.format(patch_src))
            shutil.copyfile(patch_src, extract_file)

        try:
            create_extract(extract, source_file, extract_file)
        except subprocess.CalledProcessError as e:
            # Failing extracts should not interrupt
            # the entire process
            print(e, file=sys.stderr)
            return

        print('Update metadata {}'.format(extract_file))
        update_metadata(extract_file, extract.metadata(extract_file))
bootstrap.py 文件源码 项目:foxbms-setup 作者: foxBMS 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_main_git_path():
    """Gets the remote URL of the setup repository.

    Returns:
        string: remote URL of the setup-repository.
    """
    try:
        repository_basepath = subprocess.check_output(
            'git config --get remote.origin.url'.split(' '))
    except subprocess.CalledProcessError as err:
        setup_dir_path = os.path.dirname(os.path.realpath(__file__))
        err_msg = '''
\'{}\' is not a git repository.
Did you download a .zip file from GitHub?

Use
    \'git clone https://github.com/foxBMS/foxBMS-setup\'
to download the foxBMS-setup repository.
        '''.format(setup_dir_path)
        logging.error(err_msg)
        sys.exit(1)
    repository_basepath, repository_name = repository_basepath.rsplit('/', 1)
    return repository_basepath, repository_name
git.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def callGit(self, workspacePath, *args):
        cmdLine = ['git']
        cmdLine.extend(args)
        try:
            output = subprocess.check_output(cmdLine, cwd=os.path.join(os.getcwd(), workspacePath, self.__dir),
                universal_newlines=True, stderr=subprocess.DEVNULL)
        except subprocess.CalledProcessError as e:
            raise BuildError("git error:\n Directory: '{}'\n Command: '{}'\n'{}'".format(
                os.path.join(workspacePath, self.__dir), " ".join(cmdLine), e.output.rstrip()))
        return output

    # Get GitSCM status. The purpose of this function is to return the status of the given directory
    #
    # return values:
    #  - error: The SCM is in a error state. Use this if git returned a error code.
    #  - dirty: SCM is dirty. Could be: modified files, switched to another branch/tag/commit/repo, unpushed commits.
    #  - clean: Same branch/tag/commit as specified in the recipe and no local changes.
    #  - empty: Directory is not existing.
    #
    # This function is called when build with --clean-checkout. 'error' and 'dirty' SCMs are moved to attic,
    # while empty and clean directories are not.
git.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 78 收藏 0 点赞 0 评论 0
def _scanDir(self, workspace, dir):
        self.__dir = dir
        dir = os.path.join(workspace, dir)
        try:
            remotes = subprocess.check_output(["git", "remote", "-v"],
                cwd=dir, universal_newlines=True).split("\n")
            remotes = (r[:-8].split("\t") for r in remotes if r.endswith("(fetch)"))
            self.__remotes = { remote:url for (remote,url) in remotes }

            self.__commit = subprocess.check_output(["git", "rev-parse", "HEAD"],
                cwd=dir, universal_newlines=True).strip()
            self.__description = subprocess.check_output(
                ["git", "describe", "--always", "--dirty"],
                cwd=dir, universal_newlines=True).strip()
            self.__dirty = subprocess.call(["git", "diff-index", "--quiet", "HEAD", "--"],
                cwd=dir) != 0
        except subprocess.CalledProcessError as e:
            raise BuildError("Git audit failed: " + str(e))
        except OSError as e:
            raise BuildError("Error calling git: " + str(e))
svn.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def callSubversion(self, workspacePath, *args):
        cmdLine = ['svn']
        cmdLine.extend(args)

        try:
            output = subprocess.check_output(cmdLine, cwd=workspacePath,
                universal_newlines=True, stderr=subprocess.DEVNULL)
        except subprocess.CalledProcessError as e:
            raise BuildError("svn error:\n Directory: '{}'\n Command: '{}'\n'{}'".format(
                os.path.join(workspacePath, self.__dir), " ".join(cmdLine), e.output.rstrip()))
        return output

    # Get SvnSCM status. The purpose of this function is to return the status of the given directory
    #
    # return values:
    #  - error: the scm is in a error state. Use this if svn call returns a error code.
    #  - dirty: SCM is dirty. Could be: modified files, switched to another URL or revision
    #  - clean: same URL and revision as specified in the recipe and no local changes.
    #  - empty: directory is not existing
    #
    # This function is called when build with --clean-checkout. 'error' and 'dirty' scm's are moved to attic,
    # while empty and clean directories are not.
utils.py 文件源码 项目:fabric-sdk-py 作者: hyperledger 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def cli_call(arg_list, expect_success=True, env=os.environ.copy()):
    """Executes a CLI command in a subprocess and return the results.

    Args:
        arg_list: a list command arguments
        expect_success: use False to return even if an error occurred
                        when executing the command
        env:

    Returns: (string, string, int) output message, error message, return code

    """
    p = subprocess.Popen(arg_list, stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE, env=env)
    output, error = p.communicate()
    if p.returncode != 0:
        if output:
            print("Output:\n" + str(output))
        if error:
            print("Error Message:\n" + str(error))
        if expect_success:
            raise subprocess.CalledProcessError(
                p.returncode, arg_list, output)
    return output, error, p.returncode
hatlc.py 文件源码 项目:astrobase 作者: waqasbhatti 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def gunzip_sqlitecurve(sqlitecurve):
    '''This just uncompresses the sqlitecurve in gzip format.

    FIXME: this doesn't work with gzip < 1.6 or non-GNU gzip (probably).

    '''

    # -k to keep the input .gz just in case something explodes
    cmd = 'gunzip -k %s' % sqlitecurve

    try:
        procout = subprocess.check_output(cmd, shell=True)
        return sqlitecurve.replace('.gz','')
    except subprocess.CalledProcessError:
        LOGERROR('could not uncompress %s' % sqlitecurve)
        return None



###############################################
## DECIDE WHICH COMPRESSION FUNCTIONS TO USE ##
###############################################
vagrant_handler.py 文件源码 项目:messier 作者: conorsch 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def available_vms(self, vms=None):
        """
        List all VMs regardless of state, filtering if requested via the <vms>
        parameter provider by the CLI.
        """
        try:
            possible_vms = [vm for vm in self.v.status()]
        except CalledProcessError, e:
            # TODO: Exception handling here assumes Vagrantfile is missing.
            # Vagrant seems to return 1 for many different errors, and finding
            # documentation for specific return codes has proven difficult.
            raise VagrantfileNotFound

        if vms:
            wanted_vms = [vm for vm in possible_vms if vm.name in vms]
            possible_vms = wanted_vms

        return possible_vms
vagrant_handler.py 文件源码 项目:messier 作者: conorsch 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def destroy_vms(self):
        """
        Destroy target VMs. Operates on all available VMs if none are specified.
        """
        for vm in self.vms:
            # Vagrant will return 1 if VM to be destroyed does not exist.
            if vm.state != "not_created":
                self.v.destroy(vm_name=vm.name)
                # Destroy a second time because the vagrant-digitalocean plugin
                # doesn't clean up after itself:
                # https://github.com/smdahlen/vagrant-digitalocean/issues/194
                if vm.provider == "digital_ocean":
                    try:
                        self.v.destroy(vm_name=vm.name)
                    except CalledProcessError:
                        pass
test_main_build_scenarios.py 文件源码 项目:sphinxcontrib-versioning 作者: Robpol86 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_error_bad_path(tmpdir):
    """Test handling of bad paths.

    :param tmpdir: pytest fixture.
    """
    with pytest.raises(CalledProcessError) as exc:
        pytest.run(tmpdir, ['sphinx-versioning', '-N', '-c', 'unknown', 'build', '.', str(tmpdir)])
    assert 'Directory "unknown" does not exist.' in exc.value.output

    tmpdir.ensure('is_file')
    with pytest.raises(CalledProcessError) as exc:
        pytest.run(tmpdir, ['sphinx-versioning', '-N', '-c', 'is_file', 'build', '.', str(tmpdir)])
    assert 'Directory "is_file" is a file.' in exc.value.output

    with pytest.raises(CalledProcessError) as exc:
        pytest.run(tmpdir, ['sphinx-versioning', '-N', 'build', '.', str(tmpdir)])
    assert 'Failed to find local git repository root in {}.'.format(repr(str(tmpdir))) in exc.value.output

    repo = tmpdir.ensure_dir('repo')
    pytest.run(repo, ['git', 'init'])
    empty = tmpdir.ensure_dir('empty1857')
    with pytest.raises(CalledProcessError) as exc:
        pytest.run(repo, ['sphinx-versioning', '-N', '-g', str(empty), 'build', '.', str(tmpdir)])
    assert 'Failed to find local git repository root in' in exc.value.output
    assert 'empty1857' in exc.value.output
test_export.py 文件源码 项目:sphinxcontrib-versioning 作者: Robpol86 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_new_branch_tags(tmpdir, local_light, fail):
    """Test with new branches and tags unknown to local repo.

    :param tmpdir: pytest fixture.
    :param local_light: conftest fixture.
    :param bool fail: Fail by not fetching.
    """
    remotes = [r for r in list_remote(str(local_light)) if r[1] == 'ob_at']

    # Fail.
    sha = remotes[0][0]
    target = tmpdir.ensure_dir('exported', sha)
    if fail:
        with pytest.raises(CalledProcessError):
            export(str(local_light), sha, str(target))
        return

    # Fetch.
    fetch_commits(str(local_light), remotes)

    # Export.
    export(str(local_light), sha, str(target))
    files = [f.relto(target) for f in target.listdir()]
    assert files == ['README']
    assert target.join('README').read() == 'new'
git.py 文件源码 项目:sphinxcontrib-versioning 作者: Robpol86 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_root(directory):
    """Get root directory of the local git repo from any subdirectory within it.

    :raise GitError: If git command fails (dir not a git repo?).

    :param str directory: Subdirectory in the local repo.

    :return: Root directory of repository.
    :rtype: str
    """
    command = ['git', 'rev-parse', '--show-toplevel']
    try:
        output = run_command(directory, command, env_var=False)
    except CalledProcessError as exc:
        raise GitError('Failed to find local git repository root in {}.'.format(repr(directory)), exc.output)
    if IS_WINDOWS:
        output = output.replace('/', '\\')
    return output.strip()
git.py 文件源码 项目:sphinxcontrib-versioning 作者: Robpol86 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def fetch_commits(local_root, remotes):
    """Fetch from origin.

    :raise CalledProcessError: Unhandled git command failure.

    :param str local_root: Local path to git root directory.
    :param iter remotes: Output of list_remote().
    """
    # Fetch all known branches.
    command = ['git', 'fetch', 'origin']
    run_command(local_root, command)

    # Fetch new branches/tags.
    for sha, name, kind in remotes:
        try:
            run_command(local_root, ['git', 'reflog', sha])
        except CalledProcessError:
            run_command(local_root, command + ['refs/{0}/{1}'.format(kind, name)])
            run_command(local_root, ['git', 'reflog', sha])
config.py 文件源码 项目:runcommands 作者: wylee 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def version_getter(config):
    """Get tag associated with HEAD; fall back to SHA1.

    If HEAD is tagged, return the tag name; otherwise fall back to
    HEAD's short SHA1 hash.

    .. note:: Only annotated tags are considered.

    TODO: Support non-annotated tags?

    """
    try:
        check_output(['git', 'rev-parse', '--is-inside-work-tree'], stderr=DEVNULL)
    except CalledProcessError:
        return None
    encoding = getpreferredencoding(do_setlocale=False)
    try:
        version = check_output(['git', 'describe', '--exact-match'], stderr=DEVNULL)
    except CalledProcessError:
        version = check_output(['git', 'rev-parse', '--short', 'HEAD'])
    version = version.decode(encoding).strip()
    return version
recipe-579056.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def my_thread():
  global files,path,timeout,options
  myname= threading.currentThread().getName()
  while files:
     #create command to run
     nextfile=files.pop() 
     #print name of thread and command being run
     print('Thread {0} starts processing {1}'.format(myname,nextfile))
     f=path + nextfile + options
     try:
        #timeout interrupts frozen command, shell=True does'nt open a console
        subprocess.check_call(args= f , shell=True, timeout=timeout)
     except subprocess.TimeoutExpired:
        print('Thread {0} Processing {0} took too long' .format(myname,nextfile))
     except subprocess.CalledProcessError as e: 
        print ('Thread {0} Processing {1} returned error {2}:{3}'.format(myname,nextfile,e.returncode,e.output))
     except Exception as e:
        print ('Thread {0} Processing {1} returned error {2}'.format(myname,nextfile,type(e).__name__))
  print ('thread {0} stopped'.format(myname))
test_main.py 文件源码 项目:detox 作者: tox-dev 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def invoke(command, success_codes=(0,)):
    try:
        output = subprocess.check_output(command, stderr=subprocess.STDOUT)
        status = 0
    except subprocess.CalledProcessError as error:
        output = error.output
        status = error.returncode
    output = output.decode('utf-8')
    if status not in success_codes:
        raise Exception(
            'Command %r return exit code %d and output: """%s""".' % (
                command,
                status,
                output,
            )
        )
    return status, output
heat_relations.py 文件源码 项目:charm-heat 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def upgrade_charm():
    if is_leader():
        # if we are upgrading, then the old version might have used the
        # HEAT_PATH/encryption-key. So we grab the key from that, and put it in
        # leader settings to ensure that the key remains the same during an
        # upgrade.
        encryption_path = os.path.join(HEAT_PATH, 'encryption-key')
        if os.path.isfile(encryption_path):
            with open(encryption_path, 'r') as f:
                encryption_key = f.read()
            try:
                leader_set({'heat-auth-encryption-key': encryption_key})
            except subprocess.CalledProcessError as e:
                log("upgrade: leader_set: heat-auth-encryption-key failed,"
                    " didn't delete the existing file: {}.\n"
                    "Error was: ".format(encryption_path, str(e)),
                    level=WARNING)
            else:
                # now we just delete the file
                os.remove(encryption_path)
    leader_elected()
steps.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, fqArchiveUrl, filtersDir, outputPrefix, outputUrl, diskSize, diskType, logsPath, container, scriptUrl, tag, cores, mem, preemptible):
        super(PipelineStep, self).__init__()

        fqFileName = os.path.basename(fqArchiveUrl)
        fqInputs = "{fqArchive}:{fqFileName}".format(fqArchive=fqArchiveUrl, fqFileName=fqFileName)

        try:
            filtersDirContents = subprocess.check_output(["gsutil", "ls", filtersDir])

        except subprocess.CalledProcessError as e:
            print "ERROR: couldn't get a listing of filter files!  -- {reason}".format(reason=e)
            exit(-1)

        bfInputs = [x for x in filtersDirContents.split('\n') if re.match('^.*\.bf$', x) or re.match('^.*\.txt', x)]
        bfInputs.append(fqInputs)

        inputs = ",".join(["{url}:{filename}".format(url=x, filename=os.path.basename(x)) for x in bfInputs])
        outputs = "{outputPrefix}*:{outDir}".format(outputPrefix=outputPrefix, outDir=outputUrl)
        env = "INPUT_FILE={fqFileName},OUTPUT_PREFIX={outputPrefix},FILTERS_LIST={filtersList}".format(fqFileName=fqFileName, outputPrefix=outputPrefix, filtersList=','.join([os.path.basename(x) for x in bfInputs if re.match('^.*\.bf$', x)]))

        self._step = PipelineSchema("biobloomcategorizer",
                                                   self._pipelinesConfig,
                                                   logsPath,
                                                   container,
                                                   scriptUrl=scriptUrl,
                                                   cores=cores,
                                                   mem=mem,
                                                   diskSize=diskSize,
                                                   diskType=diskType,
                                                   inputs=inputs,
                                                   outputs=outputs,
                                                   env=env,
                                                   tag=tag,
                                                   preemptible=preemptible)
utils.py 文件源码 项目:ISB-CGC-pipelines 作者: isb-cgc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getJobLogs(args, config):  # TODO: reimplement
        pipelineDbUtils = PipelineDbUtils(config)

        jobInfo = pipelineDbUtils.getJobInfo(select=["stdout_log", "stderr_log", "gcs_log_path"],
                                             where={"job_id": args.jobId})

        with open(os.devnull, 'w') as fnull:
            if args.stdout:
                try:
                    stdoutLogFile = subprocess.check_output(
                        ["gsutil", "cat", os.path.join(jobInfo[0].gcs_log_path, jobInfo[0].stdout_log)], stderr=fnull)
                except subprocess.CalledProcessError as e:
                    print "ERROR: couldn't get the stdout log : {reason}".format(reason=e)
                    exit(-1)

                print "STDOUT:\n"
                print stdoutLogFile
                print "---------\n"

            if args.stderr:
                try:
                    stderrLogFile = subprocess.check_output(
                        ["gsutil", "-q", "cat", os.path.join(jobInfo[0].gcs_log_path, jobInfo[0].stderr_log)],
                        stderr=fnull)
                except subprocess.CalledProcessError as e:
                    print "ERROR: couldn't get the stderr log : {reason}".format(reason=e)
                    exit(-1)

                print "STDERR:\n"
                print stderrLogFile
                print "---------\n"

        pipelineDbUtils.closeConnection()


问题


面经


文章

微信
公众号

扫码关注公众号