python类Popen()的实例源码

sessions.py 文件源码 项目:fluffy 作者: m4ce 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _load(self, rules):
        """Load IPTables rules

        Args:
            list: IPTables rules

        Returns:
            (int, Optional[str]): A tuple where the first object is the return code and the second is an optional error string associated to the return code.

        """

        tmpfile = tempfile.NamedTemporaryFile(
            dir=self._sessions_dir, delete=False)
        tmpfile.write("\n".join(rules))
        tmpfile.close()
        os.chmod(tmpfile.name, 0755)
        proc = subprocess.Popen(tmpfile.name, shell=True,
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, err = proc.communicate()
        os.remove(tmpfile.name)

        return proc.returncode, err
git.py 文件源码 项目:TigerHost 作者: naphatkrit 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def clone(cls, remote_url, path):
        """Clone the remote and return a GitVcs object pointed at the new repo.

        :param str remote_url: the URL to clone from
        :param str path: path to clone to

        :rtype: GitVcs
        :returns: a GitVcs object for the new cloned repo

        :raises tigerhost.vcs.base.CommandError:
        """
        args = ['git', 'clone', '--recursive', remote_url, path]
        proc = Popen(args, stdout=PIPE, stderr=PIPE)
        (stdout, stderr) = proc.communicate()
        if proc.returncode != 0:
            raise CommandError(args[0], proc.returncode, stdout, stderr)
        return cls(path=path)
base.py 文件源码 项目:TigerHost 作者: naphatkrit 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self, *args, **kwargs):
        if self.path is not None:
            # only None when called in the __init__ function
            kwargs.setdefault('cwd', self.path)

        # NOTE if we do want to make a copy of environmental variables,
        # we must remove GIT_WORK_TREE
        kwargs['env'] = {}
        kwargs['stdout'] = PIPE
        kwargs['stderr'] = PIPE

        proc = Popen(args, **kwargs)
        (stdout, stderr) = proc.communicate()
        if proc.returncode != 0:
            raise CommandError(args[0], proc.returncode, stdout, stderr)
        return stdout
test_subprocess_files.py 文件源码 项目:HORD 作者: ilija139 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def handle_eval(self, record):
        # This gives a file name / directory name that no other thread can use
        my_unique_filename = my_gen.next_filename()
        my_unique_filename = str(my_unique_filename) + ".txt"

        # Print to the input file
        f = open(my_unique_filename, 'w')
        f.write(array2str(record.params[0]))
        f.close()

        # Run the objective function and pass the filename of the input file
        self.process = Popen(['./sphere_ext_files', my_unique_filename], stdout=PIPE)
        out = self.process.communicate()[0]

        # Parse the output
        try:
            val = float(out)  # This raises ValueError if out is not a float
            self.finish_success(record, val)
            os.remove(my_unique_filename)  # Remove input file
        except ValueError:
            logging.warning("Function evaluation crashed/failed")
            self.finish_failure(record)
            os.remove(my_unique_filename)  # Remove input file
Geometry.py 文件源码 项目:Simple-User-Input-Sculpture-Generation 作者: ClaireKincaid 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def preview_scad(self, filename = 'test.scad', proc = None):
        """ Automatically opens an openscad file. 

        Still slightly WIP, as we're not completely sure on how to 
        make shell-esque commands from within Python safe.

        TODO: Sanitize inputs, implement new way of dealing with proc 

        Inputs:
        filename: the file to open
        proc: the process that a previous preview_scad call opened. Allows
        the current preview_scad call to close a previous call so you don't
        have dozens of OpenSCAD windows open.
        """

        if proc:
            proc.terminate()
        proc = subprocess32.Popen(["openscad", filename])
        return proc
Geometry.py 文件源码 项目:Simple-User-Input-Sculpture-Generation 作者: ClaireKincaid 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def render_scad(self, filename = 'test.scad'):
        """Automatically renders an openscad file to an STL. 
        Still slightly WIP,
        as we're not completely sure on how to make shell-esque commands
        safe.

        TODO: Sanitize inputs, remove waiting (find a better solution)

        Inputs:
        filename: the file to open
        """
        output_name = filename.split('.')[0] + '.stl'

        proc = subprocess32.Popen(['openscad','-o',output_name,filename])
        print "Rendering..."
        proc.wait()
        proc.terminate()
services.py 文件源码 项目:ray-legacy 作者: ray-project 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def start_worker(node_ip_address, worker_path, scheduler_address, objstore_address=None, cleanup=True):
  """This method starts a worker process.

  Args:
    node_ip_address (str): The IP address of the node that the worker runs on.
    worker_path (str): The path of the source code which the worker process will
      run.
    scheduler_address (str): The ip address and port of the scheduler to connect
      to.
    objstore_address (Optional[str]): The ip address and port of the object
      store to connect to.
    cleanup (Optional[bool]): True if using Ray in local mode. If cleanup is
      true, then this process will be killed by serices.cleanup() when the
      Python process that imported services exits. This is True by default.
  """
  command = ["python",
             worker_path,
             "--node-ip-address=" + node_ip_address,
             "--scheduler-address=" + scheduler_address]
  if objstore_address is not None:
    command.append("--objstore-address=" + objstore_address)
  p = subprocess.Popen(command)
  if cleanup:
    all_processes.append(p)
Jiffy.py 文件源码 项目:Jiffy 作者: h5rdly 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _get_ntfs_drives_win(self):
        '''Return list of ntfs drives using fsutil fsinfo's volumeinfo. 
        Result after slpit('\r\n'):

        ['Volume Name : Le Shwa',
         'Volume Serial Number : 0xd4d56c89',
         'Max Component Length : 255',
         'File System Name : NTFS',     --> index #3 --> split(':') --> index #1 
         'Is ReadWrite',....       ]'''

        ntfs_drives=[]
        win_drive_list=(chr(a)+ u':' for a in range(ord('A'), ord('Z')) if self.exists(chr(a)+':'))
        for drive in self.drives:
            volume_info=Popen(('fsutil', 'fsinfo', 'volumeInfo', drive), stdout=PIPE).communicate()[0]
            file_system=volume_info.split('\r\n')[3].split(' : ')[1]
            if file_system=='NTFS':
                ntfs_drives.append(drive)

        return ntfs_drives
fishnet.py 文件源码 项目:fishnet 作者: niklasf 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def open_process(command, cwd=None, shell=True, _popen_lock=threading.Lock()):
    kwargs = {
        "shell": shell,
        "stdout": subprocess.PIPE,
        "stderr": subprocess.STDOUT,
        "stdin": subprocess.PIPE,
        "bufsize": 1,  # Line buffered
        "universal_newlines": True,
    }

    if cwd is not None:
        kwargs["cwd"] = cwd

    # Prevent signal propagation from parent process
    try:
        # Windows
        kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
    except AttributeError:
        # Unix
        kwargs["preexec_fn"] = os.setpgrp

    with _popen_lock:  # Work around Python 2 Popen race condition
        return subprocess.Popen(command, **kwargs)
ping.py 文件源码 项目:stepler 作者: Mirantis 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _local_ping(self, count):
        cmd = self._prepare_cmd(count)
        p = subprocess.Popen(cmd,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        # check that process is alive
        if p.poll() is not None:
            stdout, stderr = p.communicate()
            raise Exception(
                'Command {!r} unexpectedly exit with message {}'.format(
                    cmd, stdout, stderr))
        result = PingResult()
        yield result
        if count:
            p.wait()
        # Check if process still alive
        elif p.poll() is None:
            p.send_signal(signal.SIGINT)
        stdout, stderr = p.communicate()
        result.stdout = stdout
common.py 文件源码 项目:localstack 作者: localstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def do_run(cmd):
        try:
            cwd = os.getcwd() if inherit_cwd else None
            if not async:
                if stdin:
                    return subprocess.check_output(cmd, shell=True,
                        stderr=stderr, stdin=subprocess.PIPE, env=env_dict, cwd=cwd)
                output = subprocess.check_output(cmd, shell=True, stderr=stderr, env=env_dict, cwd=cwd)
                return output.decode(DEFAULT_ENCODING)
            # subprocess.Popen is not thread-safe, hence use a mutex here..
            try:
                mutex_popen.acquire()
                stdin_arg = subprocess.PIPE if stdin else None
                stdout_arg = open(outfile, 'wb') if isinstance(outfile, six.string_types) else outfile
                process = subprocess.Popen(cmd, shell=True, stdin=stdin_arg, bufsize=-1,
                    stderr=stderr, stdout=stdout_arg, env=env_dict, cwd=cwd)
                return process
            finally:
                mutex_popen.release()
        except subprocess.CalledProcessError as e:
            if print_error:
                print("ERROR: '%s': %s" % (cmd, e.output))
            raise e
util.py 文件源码 项目:OnlineSchemaChange 作者: facebookincubator 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def rm(filename, sudo=False):
    """
    Remove a file on the disk, not us os.rm because we want to add timeout to
    the command. It's possible that the os call got hang when the disk has
    some problems
    """
    cmd_args = []
    if sudo:
        cmd_args += ['sudo']
    cmd_args += ['/bin/rm', filename]
    log.debug("Executing cmd: {}".format(str(cmd_args)))
    proc = subprocess.Popen(cmd_args, stdin=subprocess.PIPE,
                            stdout=subprocess.PIPE)
    try:
        (stdout, stderr) = proc.communicate(timeout=10)
    except subprocess.TimeoutExpired:
        proc.kill()
        raise OSCError('SHELL_TIMEOUT', {'cmd': ' '.join(cmd_args)})
test_verify.py 文件源码 项目:pact-python 作者: pact-foundation 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def setUp(self):
        super(mainTestCase, self).setUp()
        self.addCleanup(patch.stopall)
        self.mock_Popen = patch.object(
            verify.subprocess, 'Popen', spec=verify.subprocess.Popen).start()
        self.mock_Popen.return_value.communicate.return_value = self.locale

        self.mock_isfile = patch.object(
            verify, 'isfile', autospec=True).start()

        self.mock_rerun_command = patch.object(
            verify, 'rerun_command', autospec=True).start()

        self.runner = CliRunner()
        self.default_call = [
            '--provider-base-url=http://localhost',
            '--pact-urls=./pacts/consumer-provider.json,'
            './pacts/consumer-provider2.json,./pacts/consumer-provider3.json']

        self.default_opts = [
            '--provider-base-url=http://localhost',
            '--pact-url=./pacts/consumer-provider.json',
            '--pact-urls=./pacts/consumer-provider2.json,'
            './pacts/consumer-provider3.json']
common.py 文件源码 项目:localstack 作者: atlassian 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def do_run(cmd):
        try:
            cwd = os.getcwd() if inherit_cwd else None
            if not async:
                if stdin:
                    return subprocess.check_output(cmd, shell=True,
                        stderr=stderr, stdin=subprocess.PIPE, env=env_dict, cwd=cwd)
                output = subprocess.check_output(cmd, shell=True, stderr=stderr, env=env_dict, cwd=cwd)
                return output.decode(DEFAULT_ENCODING)
            # subprocess.Popen is not thread-safe, hence use a mutex here..
            try:
                mutex_popen.acquire()
                stdin_arg = subprocess.PIPE if stdin else None
                stdout_arg = open(outfile, 'wb') if isinstance(outfile, six.string_types) else outfile
                process = subprocess.Popen(cmd, shell=True, stdin=stdin_arg, bufsize=-1,
                    stderr=stderr, stdout=stdout_arg, env=env_dict, cwd=cwd)
                return process
            finally:
                mutex_popen.release()
        except subprocess.CalledProcessError as e:
            if print_error:
                print("ERROR: '%s': %s" % (cmd, e.output))
            raise e
dev_servers.py 文件源码 项目:snovault 作者: ENCODE-DCC 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def nginx_server_process(prefix='', echo=False):
    args = [
        os.path.join(prefix, 'nginx'),
        '-c', resource_filename('snovault', 'nginx-dev.conf'),
        '-g', 'daemon off;'
    ]
    process = subprocess.Popen(
        args,
        close_fds=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )

    if not echo:
        process.stdout.close()

    if echo:
        print('Started: http://localhost:8000')

    return process
program.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, args):
        self.args = args
        self.process = subprocess32.Popen(
            args,
            stdin=subprocess32.PIPE,
            stdout=subprocess32.PIPE,
            stderr=subprocess32.PIPE
        )
        self.err = None
helpers.py 文件源码 项目:spikefuel 作者: duguyue100 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def start_jaer(jaer_path, jaer_exec="jAERViewer1.5_linux.sh"):
    """Start jAER from Python.

    This script is written for Linux usage,
    An error will be raised if it's Windows OS.
    Instead, windows user needs to manually setup jAER.

    Parameters
    ----------
    jaer_path : string
        absolute save path of jAER.
        e.g. /Users/dgyHome/Documents/workspace/jaer/trunk
    jaer_exec : string
        The executable of jAER. Version 1.5 is assumed.

    Returns
    -------
    An opened jAER viewer.
    """
    # Check OS type
    if os.name != "posix":
        raise ValueError("The Operating System is not a POSIX platform")

    commands = "cd "+jaer_path+"; bash "+jaer_exec
    process = subprocess.Popen(commands, stdout=subprocess.PIPE, shell=True)

    return process
test_subscription_transport.py 文件源码 项目:graphql-python-subscriptions 作者: hballard 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_rejects_client_that_does_not_specifiy_a_supported_protocol(server):

    node_script = '''
        module.paths.push('{0}')
        WebSocket = require('ws')
        const client = new WebSocket('ws://localhost:{1}/socket')
        client.on('close', (code) => {{
            console.log(JSON.stringify(code))
          }}
        );
    '''.format(
        os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT)

    p = subprocess.Popen(
        ['node', '-e', node_script],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT)
    q = queue.Queue()
    t = threading.Thread(target=enqueue_output, args=(p.stdout, q))
    t.daemon = True
    t.start()
    time.sleep(.2)
    ret_values = []
    while True:
        try:
            _line = q.get_nowait()
            if isinstance(_line, bytes):
                line = _line.decode()
            line = json.loads(line)
            ret_values.append(line)
        except ValueError:
            pass
        except queue.Empty:
            break
    assert ret_values[0] == 1002 or 1006
sessions.py 文件源码 项目:fluffy 作者: m4ce 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _test(self, rules_file, queue):
        # Import the firewall rules in a detached network namespace
        unshare(CLONE_NEWNET)
        proc = subprocess.Popen(rules_file,
                                shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, err = proc.communicate()

        if proc.returncode:
            queue.put((False, err.strip()))
        else:
            queue.put((True, None))
checks.py 文件源码 项目:fluffy 作者: m4ce 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self, name):
        """Run a check

        Raises:
            CheckError

        """

        if not self.exists(name):
            raise CheckNotFound("Check not found")

        if self._checks[name]['type'] == 'exec':
            proc = subprocess.Popen(
                self._checks[name]['command'], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            try:
                out, err = proc.communicate(timeout=self._checks[name]['timeout'])
            except subprocess.TimeoutExpired as e:
                raise CheckError("Timed out")
            except Exception as e:
                raise CheckError(e.message)

            if proc.returncode:
                raise CheckError("Command failed with exitstatus {} [{}]".format(
                    proc.returncode, err.strip()))
        elif self._checks[name]['type'] == 'tcp':
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self._checks[name]['timeout'])
            try:
                result = sock.connect_ex(
                    (self._checks[name]['host'], self._checks[name]['port']))
                sock.close()
                if result != 0:
                    raise Exception("Connection failed (Errno: {})".format(result))
            except socket.timeout as e:
                raise CheckError("Timed out")
            except Exception as e:
                raise CheckError(e.message)
            finally:
                sock.close()
__init__.py 文件源码 项目:aardvark 作者: Netflix-Skunkworks 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _call_phantom(self, token, arns, output_file):
        """
        shells out to phantomjs.
        - Writes ARNs to a file that phantomjs will read as an input.
        - Phantomjs exchanges the token for session cookies.
        - Phantomjs then navigates to the IAM page and executes JavaScript
        to call GenerateServiceLastAccessedDetails for each ARN.
        - Every 10 seconds, Phantomjs calls GetServiceLastAccessedDetails
        - Phantom saves output to a file that is used by `persist()`

        :return: Exit code from phantomjs subprocess32
        """

        path = os.path.dirname(__file__)
        console_js = os.path.join(path, 'awsconsole.js')

        with tempfile.NamedTemporaryFile() as f:
            json.dump(arns, f)
            f.seek(0)
            try:
                p = subprocess32.Popen([
                    self.current_app.config.get('PHANTOMJS'),
                    console_js,
                    token,
                    f.name,
                    output_file],
                    stdout=subprocess32.PIPE, stderr=subprocess32.STDOUT)
                output, errs = p.communicate(timeout=1200)  # 20 mins
                self.current_app.logger.debug('Phantom Output: \n{}'.format(output))
                self.current_app.logger.debug('Phantom Errors: \n{}'.format(errs))
            except subprocess32.TimeoutExpired:
                self.current_app.logger.error('PhantomJS timed out')
                return 1  # return code 1 for timeout
            except CalledProcessError:
                self.current_app.logger.error('PhantomJS exited: {}'
                                              ''.format(p.returncode))
                return p.returncode
            else:
                self.current_app.logger.info('PhantomJS exited: 0')
                return 0
adb_old.py 文件源码 项目:ATX 作者: NetEaseGame 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def cmd(self, *args, **kwargs):
        '''adb command, add -s serial by default. return the subprocess.Popen object.'''
        serial = self.device_serial() # TODO(ssx): useless here, need to remove and test
        if serial:
            if " " in serial:  # TODO how to include special chars on command line
                serial = "'%s'" % serial
            return self.raw_cmd(*["-s", serial] + list(args))
        else:
            return self.raw_cmd(*args)
adb_old.py 文件源码 项目:ATX 作者: NetEaseGame 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def raw_cmd(self, *args):
        '''adb command. return the subprocess.Popen object.'''
        cmd_line = [self.adb()] + self.adb_host_port_options + list(args)
        if os.name != "nt":
            cmd_line = [" ".join(cmd_line)]
        return subprocess.Popen(cmd_line, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
web.py 文件源码 项目:ATX 作者: NetEaseGame 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def background_test(self):
        self.running = True
        proc = subprocess.Popen('echo hello', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        while True:
            line = proc.stdout.readline()
            if line == '':
                break
            print line
            for client in ProgressHandler.clients:
                client.write_message(line)
            self.output = self.output + line
        self.running = False
chromedriver.py 文件源码 项目:ATX 作者: NetEaseGame 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _launch_webdriver(self):
        print("start chromedriver instance")
        p = subprocess.Popen(['chromedriver', '--port='+str(self._port)])
        try:
            p.wait(timeout=2.0)
            return False
        except subprocess.TimeoutExpired:
            return True
ioskit.py 文件源码 项目:ATX 作者: NetEaseGame 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def start_app(self, bundle_id):
        '''
        Start app by bundle_id
        Args:
            - bundle_id(string): ex com.netease.my
        Returns:
            idevicedebug subprocess instance
        '''
        idevicedebug = must_look_exec('idevicedebug')

        # run in background
        kwargs = {'stdout': subprocess.PIPE, 'stderr': subprocess.PIPE}
        if sys.platform != 'darwin':
            kwargs['close_fds'] = True
        return subprocess.Popen([idevicedebug, "--udid", self.udid, 'run', bundle_id], **kwargs)
ios_webdriveragent.py 文件源码 项目:ATX 作者: NetEaseGame 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, device_url, bundle_id=None):
        DeviceMixin.__init__(self)
        self.__device_url = device_url
        self.__scale = None

        self._wda = wda.Client(device_url)
        self._session = None
        self._bundle_id = None

        if bundle_id:
            self.start_app(bundle_id)

        # ioskit.Device.__init__(self, udid)

        # # xcodebuild -project  -scheme WebDriverAgentRunner -destination "id=1002c0174e481a651d71e3d9a89bd6f90d253446" test
        # # Test Case '-[UITestingUITests testRunner]' started.
        # xproj_dir = os.getenv('WEBDRIVERAGENT_DIR')
        # if not xproj_dir:
        #     raise RuntimeError("env-var WEBDRIVERAGENT_DIR need to be set")

        # proc = self._xcproc = subprocess.Popen(['/usr/bin/xcodebuild',
        #     '-project', 'WebDriverAgent.xcodeproj',
        #     '-scheme', 'WebDriverAgentRunner',
        #     '-destination', 'id='+self.udid, 'test'],
        #     stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=xproj_dir, bufsize=1, universal_newlines=True)
        # for line in iter(proc.stdout.readline, b""):
        #     print 'STDOUT:', line.strip()
        #     if 'TEST FAILED' in line:
        #         raise RuntimeError("webdriver start test failed, maybe need to unlock the keychain, try\n" + 
        #             '$ security unlock-keychain ~/Library/Keychains/login.keychain')
        #     elif "Successfully wrote Manifest cache" in line:
        #         print 'GOOD ^_^, wait 5s'
        #         time.sleep(5.0)
        #         break
client.py 文件源码 项目:ATX 作者: NetEaseGame 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def raw_cmd(self, *args, **kwargs):
        '''adb command. return the subprocess.Popen object.'''
        cmds = [self.adb_path()] + self._host_port_args + list(args)
        kwargs['stdout'] = kwargs.get('stdout', subprocess.PIPE)
        kwargs['stderr'] = kwargs.get('stderr', subprocess.PIPE)
        # if os.name != "nt":
        #     cmd_line = [" ".join(cmd_line)]
        cmds = [strutils.decode(v) for v in cmds]
        return subprocess.Popen(cmds, **kwargs)
cmd2.py 文件源码 项目:cmd2 作者: python-cmd2 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def postparsing_postcmd(self, stop):
        """This runs after everything else, including after postcmd().

        It even runs when an empty line is entered.  Thus, if you need to do something like update the prompt due
        to notifications from a background thread, then this is the method you want to override to do it.

        :param stop: bool - True implies the entire application should exit.
        :return: bool - True implies the entire application should exit.
        """
        if not sys.platform.startswith('win'):
            # Fix those annoying problems that occur with terminal programs like "less" when you pipe to them
            if self.stdin.isatty():
                proc = subprocess.Popen(shlex.split('stty sane'))
                proc.communicate()
        return stop
cmd2.py 文件源码 项目:cmd2 作者: python-cmd2 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def do_shell(self, command):
        """Execute a command as if at the OS prompt.

    Usage:  shell <command> [arguments]"""
        proc = subprocess.Popen(command, stdout=self.stdout, shell=True)
        proc.communicate()


问题


面经


文章

微信
公众号

扫码关注公众号