python类run()的实例源码

tunnel.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _stop_tunnel(cmd):
    pexpect.run(cmd)
tunnel.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _stop_tunnel(cmd):
    pexpect.run(cmd)
pyexpect-sshtun.py 文件源码 项目:automatic-repo 作者: WZQ1397 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_process_info ():
    ps = pexpect.run ('ps ax -O ppid')
    pass
__init__.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def runu(command, timeout=-1, withexitstatus=False, events=None,
        extra_args=None, logfile=None, cwd=None, env=None, **kwargs):
    """This offers the same interface as :func:`run`, but using unicode.

    Like :class:`spawnu`, you can pass ``encoding`` and ``errors`` parameters,
    which will be used for both input and output.
    """
    return _run(command, timeout=timeout, withexitstatus=withexitstatus,
                events=events, extra_args=extra_args, logfile=logfile, cwd=cwd,
                env=env, _spawn=spawnu, **kwargs)
__init__.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def readlines(self, sizehint=-1):
        '''This reads until EOF using readline() and returns a list containing
        the lines thus read. The optional 'sizehint' argument is ignored.
        Remember, because this reads until EOF that means the child
        process should have closed its stdout. If you run this method on
        a child that is still running with its stdout open then this
        method will block until it timesout.'''

        lines = []
        while True:
            line = self.readline()
            if not line:
                break
            lines.append(line)
        return lines
ssh_tunnel.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_process_info ():

    # This seems to work on both Linux and BSD, but should otherwise be considered highly UNportable.

    ps = pexpect.run ('ps ax -O ppid')
    pass
test_expect.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_expect_eof (self):
        the_old_way = subprocess.Popen(args=['/bin/ls', '-l', '/bin'],
                stdout=subprocess.PIPE).communicate()[0].rstrip()
        p = pexpect.spawn('/bin/ls -l /bin')
        p.expect(pexpect.EOF) # This basically tells it to read everything. Same as pexpect.run() function.
        the_new_way = p.before
        the_new_way = the_new_way.replace(b'\r\n', b'\n'
                ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip()
        the_old_way = the_old_way.replace(b'\r\n', b'\n'
                ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip()
        assert the_old_way == the_new_way, hex_diff(the_old_way, the_new_way)
test_misc.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_env(self):
        default = pexpect.run('env')
        userenv = pexpect.run('env', env={'foo':'pexpect'})
        assert default!=userenv, "'default' and 'userenv' should be different"
        assert b'foo' in userenv and b'pexpect' in userenv, "'foo' and 'pexpect' should be in 'userenv'"
test_misc.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_cwd (self): # This assumes 'pwd' and '/tmp' exist on this platform.
        default = pexpect.run('pwd')
        tmpdir =  pexpect.run('pwd', cwd='/tmp')
        assert default!=tmpdir, "'default' and 'tmpdir' should be different"
        assert (b'tmp' in tmpdir), "'tmp' should be returned by 'pwd' command"
__init__.py 文件源码 项目:ssh-tunnel 作者: aalku 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def runu(command, timeout=-1, withexitstatus=False, events=None,
        extra_args=None, logfile=None, cwd=None, env=None, **kwargs):
    """This offers the same interface as :func:`run`, but using unicode.

    Like :class:`spawnu`, you can pass ``encoding`` and ``errors`` parameters,
    which will be used for both input and output.
    """
    return _run(command, timeout=timeout, withexitstatus=withexitstatus,
                events=events, extra_args=extra_args, logfile=logfile, cwd=cwd,
                env=env, _spawn=spawnu, **kwargs)
__init__.py 文件源码 项目:ssh-tunnel 作者: aalku 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def readlines(self, sizehint=-1):
        '''This reads until EOF using readline() and returns a list containing
        the lines thus read. The optional 'sizehint' argument is ignored.
        Remember, because this reads until EOF that means the child
        process should have closed its stdout. If you run this method on
        a child that is still running with its stdout open then this
        method will block until it timesout.'''

        lines = []
        while True:
            line = self.readline()
            if not line:
                break
            lines.append(line)
        return lines
flask_service.py 文件源码 项目:docker-ida 作者: thawsystems 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def execute_command():
    logger.info('Got incoming request')
    if 'command' not in request.form:
        return jsonify(error="Missing parameter 'command'"), 422

    command = request.form['command']
    file_name = _extract_filename_from_command(command)
    if file_name is not None and not os.path.isfile(file_name):
        logger.warn("Couldn't find file %s", file_name)

    if not command.startswith('idaw ') and not command.startswith('idaw64 '):
        return jsonify(error="'idaw' and 'idaw64' are the only valid commands"), 422

    try:
        logger.info('Executing %s', command)
        timeout = None if 'timeout' not in request.form else int(request.form['timeout'])
        _, exit_code = pexpect.run(command, timeout=timeout, withexitstatus=True)
    except pexpect.TIMEOUT:
        return jsonify(error='request to ida timed out'), 408
    finally:
        if file_name is not None:
            _remove_ida_created_files(file_name)
            logger.info('Removed ida leftover files')

    if exit_code == 0:
        logger.info('Command %s finished executing successfully', command)
    else:
        logger.warn("Command %s didn't finish correctly, IDA returned exit code %s", command, exit_code)

    if exit_code != 0:
        return jsonify(error='ida finish with status code %s' % exit_code), 500
    else:
        return jsonify(message='OK'), 200
par2backend.py 文件源码 项目:alicloud-duplicity 作者: aliyun 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def transfer(self, method, source_path, remote_filename):
        """create Par2 files and transfer the given file and the Par2 files
        with the wrapped backend.

        Par2 must run on the real filename or it would restore the
        temp-filename later on. So first of all create a tempdir and symlink
        the soure_path with remote_filename into this.
        """
        import pexpect

        par2temp = source_path.get_temp_in_same_dir()
        par2temp.mkdir()
        source_symlink = par2temp.append(remote_filename)
        source_target = source_path.get_canonical()
        if not os.path.isabs(source_target):
            source_target = os.path.join(os.getcwd(), source_target)
        os.symlink(source_target, source_symlink.get_canonical())
        source_symlink.setdata()

        log.Info("Create Par2 recovery files")
        par2create = 'par2 c -r%d -n1 %s %s' % (self.redundancy, self.common_options, source_symlink.get_canonical())
        out, returncode = pexpect.run(par2create, None, True)

        source_symlink.delete()
        files_to_transfer = []
        if not returncode:
            for file in par2temp.listdir():
                files_to_transfer.append(par2temp.append(file))

        method(source_path, remote_filename)
        for file in files_to_transfer:
            method(file, file.get_filename())

        par2temp.deltree()
expect.py 文件源码 项目:hammercloud 作者: gtmanfred 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def script(self, logininfo, filepath):
        '''
        run script on managed cloud server using /usr/bin/env expect
        '''
        if not logininfo.admin_password:
            raise Exception('Unmanaged Cloud Server: no rack password')

        if '/' in filepath:
            logininfo.script = filepath.split('/')[-1]
        else:
            logininfo.script = filepath

        if filepath.startswith('https://'):
            newpath = os.path.expanduser(
                '~/.cache/hammercloud/{login.script}'.format(login=logininfo)
            )
            if not os.path.exists(newpath):
                with open(newpath, 'w') as newfile:
                    resp = requests.get(filepath)
                    print(resp.content, file=newfile)
            filepath = newpath

        sftp(
            logininfo, 'put', filepath, logininfo.script,
            quiet=True, executable=True
        )

        command = '/home/{login.ssh_user}/{login.script} {login.extraargs}; '

        if not logininfo.no_clean:
            command += 'rm /home/{login.ssh_user}/{login.script}'

        logininfo.command = command
        cmd(logininfo)
flask_service.py 文件源码 项目:docker-ida 作者: intezer 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def execute_command():
    logger.info('Got incoming request')
    if 'command' not in request.form:
        return jsonify(error="Missing parameter 'command'"), 422

    command = request.form['command']
    file_name = _extract_filename_from_command(command)
    if file_name is not None and not os.path.isfile(file_name):
        logger.warn("Couldn't find file %s", file_name)

    if not command.startswith('idal ') and not command.startswith('idal64 '):
        return jsonify(error="'idal' and 'idal64' are the only valid commands"), 422

    try:
        logger.info('Executing %s', command)
        timeout = None if 'timeout' not in request.form else int(request.form['timeout'])
        _, exit_code = pexpect.run(command, timeout=timeout, withexitstatus=True)
    except pexpect.TIMEOUT:
        return jsonify(error='request to ida timed out'), 408
    finally:
        if file_name is not None:
            _remove_ida_created_files(file_name)
            logger.info('Removed ida leftover files')

    if exit_code == 0:
        logger.info('Command %s finished executing successfully', command)
    else:
        logger.warn("Command %s didn't finish correctly, IDA returned exit code %s", command, exit_code)

    if exit_code != 0:
        return jsonify(error='ida finish with status code %s' % exit_code), 500
    else:
        return jsonify(message='OK'), 200
__init__.py 文件源码 项目:tools 作者: InfraSIM 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def runu(command, timeout=-1, withexitstatus=False, events=None,
        extra_args=None, logfile=None, cwd=None, env=None, **kwargs):
    """This offers the same interface as :func:`run`, but using unicode.

    Like :class:`spawnu`, you can pass ``encoding`` and ``errors`` parameters,
    which will be used for both input and output.
    """
    return _run(command, timeout=timeout, withexitstatus=withexitstatus,
                events=events, extra_args=extra_args, logfile=logfile, cwd=cwd,
                env=env, _spawn=spawnu, **kwargs)
__init__.py 文件源码 项目:tools 作者: InfraSIM 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def readlines(self, sizehint=-1):
        '''This reads until EOF using readline() and returns a list containing
        the lines thus read. The optional 'sizehint' argument is ignored.
        Remember, because this reads until EOF that means the child
        process should have closed its stdout. If you run this method on
        a child that is still running with its stdout open then this
        method will block until it timesout.'''

        lines = []
        while True:
            line = self.readline()
            if not line:
                break
            lines.append(line)
        return lines
ssh_tunnel.py 文件源码 项目:tools 作者: InfraSIM 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_process_info ():

    # This seems to work on both Linux and BSD, but should otherwise be considered highly UNportable.

    ps = pexpect.run ('ps ax -O ppid')
    pass
test_expect.py 文件源码 项目:tools 作者: InfraSIM 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_expect_eof (self):
        the_old_way = subprocess.Popen(args=['/bin/ls', '-l', '/bin'],
                stdout=subprocess.PIPE).communicate()[0].rstrip()
        p = pexpect.spawn('/bin/ls -l /bin')
        p.expect(pexpect.EOF) # This basically tells it to read everything. Same as pexpect.run() function.
        the_new_way = p.before
        the_new_way = the_new_way.replace(b'\r\n', b'\n'
                ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip()
        the_old_way = the_old_way.replace(b'\r\n', b'\n'
                ).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip()
        assert the_old_way == the_new_way, hex_diff(the_old_way, the_new_way)
test_misc.py 文件源码 项目:tools 作者: InfraSIM 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_env(self):
        default = pexpect.run('env')
        userenv = pexpect.run('env', env={'foo':'pexpect'})
        assert default!=userenv, "'default' and 'userenv' should be different"
        assert b'foo' in userenv and b'pexpect' in userenv, "'foo' and 'pexpect' should be in 'userenv'"


问题


面经


文章

微信
公众号

扫码关注公众号