python类CalledProcessError()的实例源码

ioskit.py 文件源码 项目:ATX 作者: NetEaseGame 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def screenshot(self, filename=None):
        """
        Return:
            PIL.Image

        Raises:
            EnvironmentError
        """
        tmpfile = tempfile.mktemp(prefix='atx-screencap-', suffix='.tiff')
        try:
            idevice("screenshot", "--udid", self.udid, tmpfile)
        except subprocess.CalledProcessError as e:
            sys.exit(e.message)

        try:
            image = Image.open(tmpfile)
            image.load()
            if filename:
                image.save(filename)
            return image
        finally:
            if os.path.exists(tmpfile):
                os.unlink(tmpfile)
ioskit.py 文件源码 项目:AutomatorX 作者: xiaoyaojjian 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def screenshot(self, filename=None):
        """
        Return:
            PIL.Image

        Raises:
            EnvironmentError
        """
        tmpfile = tempfile.mktemp(prefix='atx-screencap-', suffix='.tiff')
        try:
            idevice("screenshot", "--udid", self.udid, tmpfile)
        except subprocess.CalledProcessError as e:
            sys.exit(e.message)

        try:
            image = Image.open(tmpfile)
            image.load()
            if filename:
                image.save(filename)
            return image
        finally:
            if os.path.exists(tmpfile):
                os.unlink(tmpfile)
ssh.py 文件源码 项目:AerisCloud 作者: AerisCloud 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _services(ip, timeout, *extra_args):
    args = ['ssh', ip, '-t']

    if extra_args:
        args += list(extra_args)

    args += ['-o', 'StrictHostKeyChecking no',
             '-o', 'ConnectTimeout %d' % timeout,
             '-o', 'BatchMode yes',
             '--',
             'cat', '/etc/aeriscloud.d/*']

    try:
        return [
            dict(zip(
                ['name', 'port', 'path'],
                service.strip().split(',')
            ))
            for service in check_output(args).split('\n')
            if service
        ]
    except CalledProcessError:
        return []
build.py 文件源码 项目:AerisCloud 作者: AerisCloud 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _search_variables(search_path, variable):
    files = set()

    cmd = "grep -rI '%s = ' %s" % (variable, quote(search_path))
    try:
        grep = subprocess32.check_output(cmd, shell=True)
    except subprocess32.CalledProcessError:
        return []

    for line in grep.split('\n'):
        if not line.strip():
            continue
        filename = line[:line.find(':')].strip()
        if filename.startswith('.'):
            continue
        files.add(filename)

    return files
common.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def do_run(cmd):
        try:
            cwd = os.getcwd() if inherit_cwd else None
            if not async:
                if stdin:
                    return subprocess.check_output(cmd, shell=True,
                        stderr=stderr, stdin=subprocess.PIPE, env=env_dict, cwd=cwd)
                output = subprocess.check_output(cmd, shell=True, stderr=stderr, env=env_dict, cwd=cwd)
                return output.decode(DEFAULT_ENCODING)
            # subprocess.Popen is not thread-safe, hence use a mutex here..
            try:
                mutex_popen.acquire()
                stdin_arg = subprocess.PIPE if stdin else None
                stdout_arg = open(outfile, 'wb') if isinstance(outfile, six.string_types) else outfile
                process = subprocess.Popen(cmd, shell=True, stdin=stdin_arg, bufsize=-1,
                    stderr=stderr, stdout=stdout_arg, env=env_dict, cwd=cwd)
                return process
            finally:
                mutex_popen.release()
        except subprocess.CalledProcessError as e:
            if print_error:
                print("ERROR: '%s': %s" % (cmd, e.output))
            raise e
common.py 文件源码 项目:localstack 作者: atlassian 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def do_run(cmd):
        try:
            cwd = os.getcwd() if inherit_cwd else None
            if not async:
                if stdin:
                    return subprocess.check_output(cmd, shell=True,
                        stderr=stderr, stdin=subprocess.PIPE, env=env_dict, cwd=cwd)
                output = subprocess.check_output(cmd, shell=True, stderr=stderr, env=env_dict, cwd=cwd)
                return output.decode(DEFAULT_ENCODING)
            # subprocess.Popen is not thread-safe, hence use a mutex here..
            try:
                mutex_popen.acquire()
                stdin_arg = subprocess.PIPE if stdin else None
                stdout_arg = open(outfile, 'wb') if isinstance(outfile, six.string_types) else outfile
                process = subprocess.Popen(cmd, shell=True, stdin=stdin_arg, bufsize=-1,
                    stderr=stderr, stdout=stdout_arg, env=env_dict, cwd=cwd)
                return process
            finally:
                mutex_popen.release()
        except subprocess.CalledProcessError as e:
            if print_error:
                print("ERROR: '%s': %s" % (cmd, e.output))
            raise e
__init__.py 文件源码 项目:aardvark 作者: Netflix-Skunkworks 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _call_phantom(self, token, arns, output_file):
        """
        shells out to phantomjs.
        - Writes ARNs to a file that phantomjs will read as an input.
        - Phantomjs exchanges the token for session cookies.
        - Phantomjs then navigates to the IAM page and executes JavaScript
        to call GenerateServiceLastAccessedDetails for each ARN.
        - Every 10 seconds, Phantomjs calls GetServiceLastAccessedDetails
        - Phantom saves output to a file that is used by `persist()`

        :return: Exit code from phantomjs subprocess32
        """

        path = os.path.dirname(__file__)
        console_js = os.path.join(path, 'awsconsole.js')

        with tempfile.NamedTemporaryFile() as f:
            json.dump(arns, f)
            f.seek(0)
            try:
                p = subprocess32.Popen([
                    self.current_app.config.get('PHANTOMJS'),
                    console_js,
                    token,
                    f.name,
                    output_file],
                    stdout=subprocess32.PIPE, stderr=subprocess32.STDOUT)
                output, errs = p.communicate(timeout=1200)  # 20 mins
                self.current_app.logger.debug('Phantom Output: \n{}'.format(output))
                self.current_app.logger.debug('Phantom Errors: \n{}'.format(errs))
            except subprocess32.TimeoutExpired:
                self.current_app.logger.error('PhantomJS timed out')
                return 1  # return code 1 for timeout
            except CalledProcessError:
                self.current_app.logger.error('PhantomJS exited: {}'
                                              ''.format(p.returncode))
                return p.returncode
            else:
                self.current_app.logger.info('PhantomJS exited: 0')
                return 0
ioskit.py 文件源码 项目:ATX 作者: NetEaseGame 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def check_output(cmds, shell=False):
    try:
        output = subprocess.check_output(cmds, stderr=subprocess.STDOUT, shell=shell)
        return output
    except subprocess.CalledProcessError:
        # logger.warn('Failed to run command: %s', ' '.join(cmds))
        # logger.warn('Error output:\n%s', e.output)
        raise
ioskit.py 文件源码 项目:ATX 作者: NetEaseGame 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def idevice(name, *args):
    exec_name = 'idevice' + name
    exec_path = look_exec(exec_name)
    if not exec_path:
        raise EnvironmentError('Necessary binary ("%s") not found.' % exec_name)

    cmds = [exec_path] + list(args)
    try:
        output = subprocess.check_output(cmds, stderr=subprocess.STDOUT, shell=False)
        return output
    except subprocess.CalledProcessError:
        raise
install.py 文件源码 项目:ATX 作者: NetEaseGame 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_file_size(adb, remote_path):
    try:
        output = adb.run_cmd('shell', 'ls', '-l', remote_path)
        m = re.search(r'\s(\d+)', output)
        if not m:
            return 0
        return int(m.group(1))
    except subprocess.CalledProcessError as e:
        log.warn("call error: %s", e)
        time.sleep(.1)
        return 0
cmd2.py 文件源码 项目:cmd2 作者: python-cmd2 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _which(editor):
    try:
        editor_path = subprocess.check_output(['which', editor], stderr=subprocess.STDOUT).strip()
        if six.PY3:
            editor_path = editor_path.decode()
    except subprocess.CalledProcessError:
        editor_path = None
    return editor_path
ioskit.py 文件源码 项目:AutomatorX 作者: xiaoyaojjian 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def check_output(cmds, shell=False):
    try:
        output = subprocess.check_output(cmds, stderr=subprocess.STDOUT, shell=shell)
        return output
    except subprocess.CalledProcessError:
        # logger.warn('Failed to run command: %s', ' '.join(cmds))
        # logger.warn('Error output:\n%s', e.output)
        raise
ioskit.py 文件源码 项目:AutomatorX 作者: xiaoyaojjian 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def idevice(name, *args):
    exec_name = 'idevice' + name
    exec_path = look_exec(exec_name)
    if not exec_path:
        raise EnvironmentError('Necessary binary ("%s") not found.' % exec_name)

    cmds = [exec_path] + list(args)
    try:
        output = subprocess.check_output(cmds, stderr=subprocess.STDOUT, shell=False)
        return output
    except subprocess.CalledProcessError:
        raise
install.py 文件源码 项目:AutomatorX 作者: xiaoyaojjian 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_file_size(adb, remote_path):
    try:
        output = adb.run_cmd('shell', 'ls', '-l', remote_path)
        m = re.search(r'\s(\d+)', output)
        if not m:
            return 0
        return int(m.group(1))
    except subprocess.CalledProcessError as e:
        log.warn("call error: %s", e)
        time.sleep(.1)
        return 0
test_subprocess_utils.py 文件源码 项目:TigerHost 作者: naphatkrit 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_check_call_realtime_failure():
    lines = ''
    with pytest.raises(subprocess.CalledProcessError) as e:
        for l in check_call_realtime(['make', 'doesnotexist']):
            lines += l
    assert e.value.returncode != 0
    assert e.value.cmd == ['make', 'doesnotexist']
    assert len(lines) != 0
subprocess_utils.py 文件源码 项目:TigerHost 作者: naphatkrit 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def check_call_realtime(args):
    """Run command with arguments and yield the output as they come.

    Stderr is piped into stdout.

    :raises subprocess.CalledProcessError: if exit code is non-zero
    """
    p = subprocess.Popen(args, stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT)
    while p.poll() is None:
        yield p.stdout.read()
    yield p.stdout.read()
    if p.returncode != 0:
        raise subprocess.CalledProcessError(p.returncode, args)
flite.py 文件源码 项目:epitran 作者: dmort27 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def english_g2p(self, text):
        text = self.normalize(text)
        try:
            arpa_text = subprocess.check_output(['t2p', '"{}"'.format(text)])
            arpa_text = arpa_text.decode('utf-8')
        except OSError:
            logging.warning('t2p (from flite) is not installed.')
            arpa_text = ''
        except subprocess.CalledProcessError:
            logging.warning('Non-zero exit status from t2p.')
            arpa_text = ''
        return self.arpa_to_ipa(arpa_text)
flite.py 文件源码 项目:epitran 作者: dmort27 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def english_g2p(self, text):
        text = self.normalize(text).lower()
        try:
            arpa_text = subprocess.check_output(['lex_lookup', text])
            arpa_text = arpa_text.decode('utf-8')
        except OSError:
            logging.warning('lex_lookup (from flite) is not installed.')
            arpa_text = ''
        except subprocess.CalledProcessError:
            logging.warning('Non-zero exit status from lex_lookup.')
            arpa_text = ''
        return self.arpa_to_ipa(arpa_text)
nmap-scan.py 文件源码 项目:hostscan 作者: gaberger 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run_nmap(net):
    try:
        out = subprocess.check_output(["nmap", "-oX", "-" , "-R", "-p", "22-443", "-sV" , net])
    except CalledProcessError:
        print("Error in caller\n")
        exit(1)
    return out
concurrency.py 文件源码 项目:ternarynet 作者: czhu95 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def subproc_call(cmd, timeout=None):
    try:
        output = subprocess.check_output(
                cmd, stderr=subprocess.STDOUT,
                shell=True, timeout=timeout)
        return output
    except subprocess.TimeoutExpired as e:
        logger.warn("Command timeout!")
        logger.warn(e.output)
    except subprocess.CalledProcessError as e:
        logger.warn("Commnad failed: {}".format(e.returncode))
        logger.warn(e.output)
VideoManager.py 文件源码 项目:Albireo 作者: lordfriend 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def create_thumbnail(self, video_path, time, output_path):
        try:
            subprocess.check_call(['ffmpeg',
                                   '-y',
                                   '-ss',
                                   time,
                                   '-i',
                                   b'{0}'.format(video_path.encode('utf-8')),
                                   '-vframes',
                                   '1',
                                   output_path])
            return output_path
        except subprocess.CalledProcessError as error:
            logger.error(error, exc_info=True)
            return output_path
VideoManager.py 文件源码 项目:Albireo 作者: lordfriend 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_video_meta(self, video_path):
        '''
        get video meta information
        :param video_path: the absolute path of video file
        :return: a dictionary
            {
                'width': integer,
                'height':  integer,
                'duration': integer (millisecond)
            }

        if an error occurred, this method will return None
        '''
        try:
            output = subprocess.check_output([
                'ffprobe',
                '-v',
                'error',
                '-show_entries',
                'format=duration:stream=width:stream=height',
                '-select_streams',
                'v:0',
                '-of',
                'json',
                b'{0}'.format(video_path.encode('utf-8'))
            ])
            meta = json.loads(output)
            result = {}
            if 'format' in meta and 'duration' in meta['format']:
                result['duration'] = int(float(meta['format']['duration']) * 1000)
            if 'streams' in meta and len(meta['streams']) and 'width' in meta['streams'][0] and 'height' in meta['streams'][0]:
                result['width'] = meta['streams'][0]['width']
                result['height'] = meta['streams'][0]['height']
            return result
        except subprocess.CalledProcessError as error:
            logger.error(error)
            return None
process.py 文件源码 项目:BAG_framework 作者: ucb-art 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def run_and_wait(args, timeout=None, logfile=None, append=False,
                 env=None, cwd=None):
    """Run a command in a subprocess, then wait for it to finish.

    Parameters
    ----------
    args : string or list[string]
        the command to run.  Should be either a command string or a list
        of command string and its arguments as strings.  A list is preferred;
        see Python subprocess documentation.
    timeout : float or None
        the amount of time to wait for the command to finish, in seconds.
        If None, waits indefinitely.
    logfile : string or None
        If given, stdout and stderr will be written to this file.
    append : bool
        True to append to the logfile.  Defaults to False.
    env : dict[string, any]
        If not None, environment variables of the subprocess will be set
        according to this dictionary instead of inheriting from current
        process.
    cwd : string or None
        The current working directory of the subprocess.

    Returns
    -------
    output : string
        the standard output and standard error from the command.

    Raises
    ------
    subprocess.CalledProcessError
        if any error occurred in the subprocess.
    """
    output = subprocess.check_output(args, stderr=subprocess.STDOUT,
                                     timeout=timeout, env=env, cwd=cwd)
    output = output.decode(encoding=bag_encoding, errors=bag_codec_error)

    if logfile is not None:
        write_file(logfile, output, append=append)

    return output


问题


面经


文章

微信
公众号

扫码关注公众号