python类Popen()的实例源码

util.py 文件源码 项目:cget 作者: pfultz2 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def cmd(args, env=None, **kwargs):
    e = merge(os.environ, env)
    child = subprocess.Popen(args, env=e, **kwargs)
    child.communicate()
    if child.returncode != 0: 
        raise BuildError(msg='Command failed: ' + str(args), data=e)
adb_old.py 文件源码 项目:AutomatorX 作者: xiaoyaojjian 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def cmd(self, *args, **kwargs):
        '''adb command, add -s serial by default. return the subprocess.Popen object.'''
        serial = self.device_serial() # TODO(ssx): useless here, need to remove and test
        if serial:
            if " " in serial:  # TODO how to include special chars on command line
                serial = "'%s'" % serial
            return self.raw_cmd(*["-s", serial] + list(args))
        else:
            return self.raw_cmd(*args)
adb_old.py 文件源码 项目:AutomatorX 作者: xiaoyaojjian 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def raw_cmd(self, *args):
        '''adb command. return the subprocess.Popen object.'''
        cmd_line = [self.adb()] + self.adb_host_port_options + list(args)
        if os.name != "nt":
            cmd_line = [" ".join(cmd_line)]
        return subprocess.Popen(cmd_line, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
web.py 文件源码 项目:AutomatorX 作者: xiaoyaojjian 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def background_test(self):
        self.running = True
        proc = subprocess.Popen('echo hello', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        while True:
            line = proc.stdout.readline()
            if line == '':
                break
            print line
            for client in ProgressHandler.clients:
                client.write_message(line)
            self.output = self.output + line
        self.running = False
ioskit.py 文件源码 项目:AutomatorX 作者: xiaoyaojjian 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def start_app(self, bundle_id):
        '''
        Start app by bundle_id
        Args:
            - bundle_id(string): ex com.netease.my
        Returns:
            idevicedebug subprocess instance
        '''
        idevicedebug = must_look_exec('idevicedebug')

        # run in background
        kwargs = {'stdout': subprocess.PIPE, 'stderr': subprocess.PIPE}
        if sys.platform != 'darwin':
            kwargs['close_fds'] = True
        return subprocess.Popen([idevicedebug, "--udid", self.udid, 'run', bundle_id], **kwargs)
__init__.py 文件源码 项目:AutomatorX 作者: xiaoyaojjian 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _init_instruments(self, bundle_id):
        self._bootstrap = os.path.join(__dir__, 'bootstrap.sh')
        self._bundle_id = bundle_id
        self._env.update({'UDID': self.udid, 'BUNDLE_ID': self._bundle_id})
        # 1. remove pipe
        # subprocess.check_output([self._bootstrap, 'reset'], env=self._env)
        # 2. start instruments
        self._proc = subprocess.Popen([self._bootstrap, 'instruments'], env=self._env, stdout=subprocess.PIPE)
        self.sleep(5.0)
        self._wait_instruments()
client.py 文件源码 项目:AutomatorX 作者: xiaoyaojjian 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def raw_cmd(self, *args, **kwargs):
        '''adb command. return the subprocess.Popen object.'''
        cmds = [self.adb_path()] + self._host_port_args + list(args)
        kwargs['stdout'] = kwargs.get('stdout', subprocess.PIPE)
        kwargs['stderr'] = kwargs.get('stderr', subprocess.PIPE)
        # if os.name != "nt":
        #     cmd_line = [" ".join(cmd_line)]
        return subprocess.Popen(cmds, **kwargs)
devserver_main.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _run_cron_in_background():
    if os.environ.get('BOTTLE_CHILD'):
        return
    proc = subprocess.Popen(
        [sys.executable, '-m', 'hibiki.cron_main'] + sys.argv[1:],
        preexec_fn=os.setpgrp)
    def kill_cron():
        os.killpg(proc.pid, signal.SIGTERM)
        proc.wait()
    atexit.register(kill_cron)
game.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def compile_problem(solution_spec):
    """Compiles a problem submission and generates a problem spec.

    Args:
        solution_spec: Specification string of a solution corresponding to the
            submitted problem.

    Returns:
        (problem_spec, problem_size)
        problem_spec: Specification string of the problem.
        problem_size: Problem size.

    Raises:
        VerificationError: If the solution specification is invalid.
        subprocess.TimeoutExpired: On judge timeout.
        AssertionError: On scrape error.
    """
    with make_temporary_file_with_content(solution_spec) as solution_file:
        proc = subprocess.Popen(
            ['./akatsuki', '--logtostderr', '--compile', solution_file.name],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
        try:
            stdout_output, stderr_output = proc.communicate(
                timeout=_JUDGE_TIMEOUT_SECONDS)
        except subprocess.TimeoutExpired:
            proc.kill()
            proc.wait()
            raise  # report ISE
    if proc.returncode:
        m = _VERIFICATION_ERROR_RE.search(stdout_output)
        assert m, stdout_output  # report ISE
        raise VerificationError(m.group(1))
    problem_spec = stdout_output
    problem_size = sum(len(s) for s in problem_spec.split())
    return (problem_spec, problem_size)
game.py 文件源码 项目:icfpc2016-judge 作者: icfpc2016 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def evaluate_solution(problem_spec, solution_spec):
    """Evaluates a solution submission.

    Args:
        problem_spec: Specification string of a problem.
        solution_spec: Specification string of a solution.

    Returns:
        (resemblance_int, raw_evaluator_output)

    Raises:
        VerificationError: If any of the specifications are invalid.
        subprocess.TimeoutExpired: On judge timeout.
        AssertionError: On scrape error.
    """
    with make_temporary_file_with_content(problem_spec) as problem_file, \
         make_temporary_file_with_content(solution_spec) as solution_file:
        proc = subprocess.Popen(
            ['./akatsuki', '--logtostderr', '--evaluate',
             problem_file.name, solution_file.name],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
        try:
            stdout_output, stderr_output = proc.communicate(
                timeout=_JUDGE_TIMEOUT_SECONDS)
        except subprocess.TimeoutExpired:
            proc.kill()
            proc.wait()
            raise  # report ISE
    if proc.returncode:
        m = _VERIFICATION_ERROR_RE.search(stdout_output)
        assert m, stdout_output  # report ISE
        raise VerificationError(m.group(1))
    m = re.search(r'integer_resemblance: (\d+)', stdout_output)
    assert m, stdout_output  # report ISE
    resemblance_int = int(m.group(1))
    return resemblance_int, stdout_output.decode('utf-8')
process.py 文件源码 项目:BAG_framework 作者: ucb-art 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run_proc_with_quit(proc_id, quit_dict, args, logfile=None, append=False, env=None, cwd=None):
    if logfile is None:
        logfile = os.devnull

    mode = 'ab' if append else 'wb'
    with open(logfile, mode) as logf:
        if proc_id in quit_dict:
            return None
        proc = subprocess.Popen(args, stdout=logf, stderr=subprocess.STDOUT,
                                env=env, cwd=cwd)
        retcode = None
        num_kill = 0
        timeout = 0.05
        while retcode is None and num_kill <= 2:
            try:
                retcode = proc.wait(timeout=timeout)
            except subprocess.TimeoutExpired:
                if proc_id in quit_dict:
                    if num_kill == 0:
                        proc.terminate()
                        timeout = quit_dict[proc_id]
                    elif num_kill == 1:
                        proc.kill()
                    num_kill += 1

        return proc.returncode
deauth.py 文件源码 项目:wifisniffer 作者: reiinakano 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def start_process(self):
        print "Deauthenticating conection between " + self.stn_MAC + " with AP_MAC " + self.AP_MAC
        proc = subprocess32.Popen(['sudo', 'aireplay-ng', '-0', '10', '-a', self.AP_MAC, '-c', self.stn_MAC, self.interface])
dot11decryptprocess.py 文件源码 项目:wifisniffer 作者: reiinakano 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def start_dot11decrypt(self, interface, decryption_key): #starts and returns dot11decrypt subprocess and interface.
        print "Starting new dot11decrypt subprocess on " + interface + " with key " + decryption_key
        proc = subprocess32.Popen(['sudo', 'd11decrypt/build/dot11decrypt', interface, decryption_key], stdout=subprocess32.PIPE)
        read = proc.stdout.readline()
        if read[0:14] == "Using device: ":
            print "Currently decrypting packets and releasing to " + read[14:].rstrip()
            print "Process number is " + str(proc.pid)
            return proc, read[14:].rstrip()
        else:
            print read
            raise Exception
dot11decryptprocess.py 文件源码 项目:wifisniffer 作者: reiinakano 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def kill(self):
        print "Killing dot11decrypt subprocess and closing tap interface"
        try:
            self.proc.kill()
            subprocess32.Popen(['sudo', 'ip', 'link', 'delete', self.tap])
            print "Successfully killed this subprocess"
        except OSError as e:
            print e
            print "Oops. Can't seem to kill the process. Are you running as root?"
driver.py 文件源码 项目:cloudsdk-test-driver 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def Run(self, command, timeout=None, env=None):
    """Run a command against this SDK installation.

    Args:
      command: string, list or tuple, The command to run (e.g. ['gsutil', 'cp',
        ...])
      timeout: number, Seconds to wait before timing out the command.
      env: dict or None, Extra environmental variables use with this command.

    Returns:
      (stdout, stderr, returncode) returned from the command.

    Raises:
      error.SDKError: If the command cannot be run.
    """
    # Add the passed in variables to the precomputed environment (without
    # altering either dictionary).
    if env:
      env = dict(self._env, **env)
    else:
      env = self._env

    p = subprocess.Popen(
        _PrepareCommand(command), stdout=subprocess.PIPE,
        stderr=subprocess.PIPE, cwd=os.path.dirname(self._sdk_dir), env=env)
    if TIMEOUT_ENABLED:
      out, err = p.communicate(timeout=timeout)
    else:
      if timeout:
        sys.stderr.write(
            'Warning: timeout specified, but subprocess32 is not available.')
      out, err = p.communicate()

    # TODO(magimaster): Change this to raise an error if returncode isn't 0
    return out, err, p.returncode
ansible.py 文件源码 项目:AerisCloud 作者: AerisCloud 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def run_playbook(playbook, inventory, *args, **kwargs):
    env = ansible_env(os.environ.copy())
    cmd = ['ansible-playbook', '-i', inventory, playbook] + list(args)

    if verbosity():
        cmd += ['-' + ('v' * verbosity())]

    show_timestamp = False
    if 'timestamp' in kwargs:
        show_timestamp = kwargs['timestamp']
        del kwargs['timestamp']

    output = print
    if show_timestamp:
        output = timestamp

    logger.info('running %s', ' '.join(cmd))
    logger.debug('env: %r', env)

    process = Popen(cmd, env=env, stdout=PIPE,
                    bufsize=1, **kwargs)
    for line in iter(process.stdout.readline, b''):
        output(line[:-1])
    # empty output buffers
    process.poll()
    return process.returncode
box.py 文件源码 项目:AerisCloud 作者: AerisCloud 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def ssh_shell(self, cmd=None, cd=True, popen=False, **kwargs):
        """
        Create an interactive ssh shell on the remote VM

        :return: subprocess32.Popen
        """
        call_args = [
            'ssh', self.ip(), '-t', '-A',
            '-l', 'vagrant',
            '-i', self.ssh_key()]
        if cmd:
            if isinstance(cmd, tuple) or isinstance(cmd, list):
                cmd = ' '.join(map(quote, cmd))

            if cd:
                cmd = '[ ! -d "{0}" ] && exit {1}; cd "{0}"; {2}'.format(
                    self.project.name(),
                    self.NO_PROJECT_DIR,
                    cmd
                )
            call_args.append(cmd)
        self._logger.debug('calling %s', ' '.join(call_args))

        if popen:
            return Popen(call_args, start_new_session=True, **kwargs)
        return call(call_args, **kwargs)
script_module.py 文件源码 项目:STF 作者: nokia 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def checkOthers(self, test_step):
        if os.access(test_step.path, X_OK):
            return

        logger.debug("add +x for %s", test_step.path)
        command = "chmod +x " + test_step.path
        proc = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        outs, errs = proc.communicate()
        rc = proc.returncode
        if rc:
            raise Exception("command %s failed: rc is %s, output is %s, errs is %s ", command, rc, outs, errs)
subprocess_utils.py 文件源码 项目:TigerHost 作者: naphatkrit 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def check_call_realtime(args):
    """Run command with arguments and yield the output as they come.

    Stderr is piped into stdout.

    :raises subprocess.CalledProcessError: if exit code is non-zero
    """
    p = subprocess.Popen(args, stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT)
    while p.poll() is None:
        yield p.stdout.read()
    yield p.stdout.read()
    if p.returncode != 0:
        raise subprocess.CalledProcessError(p.returncode, args)
test_subprocess.py 文件源码 项目:HORD 作者: ilija139 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def handle_eval(self, record):
        self.process = Popen(['./sphere_ext', array2str(record.params[0])],
                             stdout=PIPE)
        out = self.process.communicate()[0]
        try:
            val = float(out)  # This raises ValueError if out is not a float
            self.finish_success(record, val)
        except ValueError:
            logging.warning("Function evaluation crashed/failed")
            self.finish_failure(record)


问题


面经


文章

微信
公众号

扫码关注公众号