python类split()的实例源码

command.py 文件源码 项目:heerkat 作者: Roche 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def cmd(command):
    result = Result()

    p = Popen(shlex.split(command), stdin=PIPE, stdout=PIPE, stderr=PIPE)
    (stdout, stderr) = p.communicate()

    result.exit_code = p.returncode
    result.stdout = stdout
    result.stderr = stderr
    result.command = command

    if p.returncode != 0:
        print 'Error executing command [%s]' % command
        print 'stderr: [%s]' % stderr
        print 'stdout: [%s]' % stdout

    return result
si.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def mounts(self, detectdev=False):
        mounts = []
        with open('/proc/mounts', 'r') as f:
            for line in f:
                dev, path, fstype = line.split()[0:3]
                if fstype in ('ext2', 'ext3', 'ext4', 'xfs',
                              'jfs', 'reiserfs', 'btrfs',
                              'simfs'): # simfs: filesystem in OpenVZ
                    if not os.path.isdir(path): continue
                    mounts.append({'dev': dev, 'path': path, 'fstype': fstype})
        for mount in mounts:
            stat = os.statvfs(mount['path'])
            total = stat.f_blocks*stat.f_bsize
            free = stat.f_bfree*stat.f_bsize
            used = (stat.f_blocks-stat.f_bfree)*stat.f_bsize
            mount['total'] = b2h(total)
            mount['free'] = b2h(free)
            mount['used'] = b2h(used)
            mount['used_rate'] = div_percent(used, total)
            if detectdev:
                dev = os.stat(mount['path']).st_dev
                mount['major'], mount['minor'] = os.major(dev), os.minor(dev)
        return mounts
easy_install.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _pythonpath():
    items = os.environ.get('PYTHONPATH', '').split(os.pathsep)
    return filter(None, items)
si.py 文件源码 项目:Starfish 作者: BillWang139967 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def uptime(self):
        with open('/proc/uptime', 'r') as f:
            uptime, idletime = f.readline().split()
            up_seconds = int(float(uptime))
            idle_seconds = int(float(idletime))
            # in some machine like Linode VPS, idle time may bigger than up time
            if idle_seconds > up_seconds:
                cpu_count = multiprocessing.cpu_count()
                idle_seconds = idle_seconds/cpu_count
                # in some VPS, this value may still bigger than up time
                # may be the domain 0 machine has more cores
                # we calclate approximately for it
                if idle_seconds > up_seconds:
                    for n in range(2,10):
                        if idle_seconds/n < up_seconds:
                            idle_seconds = idle_seconds/n
                            break
            fmt = '{days} ? {hours} ?? {minutes} ? {seconds} ?'
            uptime_string = strfdelta(datetime.timedelta(seconds = up_seconds), fmt)
            idletime_string = strfdelta(datetime.timedelta(seconds = idle_seconds), fmt)
        return {
            'up': uptime_string,
            'idle': idletime_string,
            'idle_rate': div_percent(idle_seconds, up_seconds),
        }
format.py 文件源码 项目:Aigis 作者: Joaquin-V 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def parse_command(self, prefixes = ['!']):
        for prefix in prefixes:
            if len(prefix) < len(self.text_cont) and self.text_cont.startswith(prefix):
                print("Found a command with the {} prefix.".format(prefix))
                self.comm_is = True
                self.command = self.text_words[0][len(prefix):]

                if len(self.text_words) > 1:
                    try:
                        self.comm_params = shlex.split(self.text_cont)[1:]
                    except ValueError:
                        self.comm_params = self.text_words[1:]

        if self.chat_query and self.comm_is is False:
                self.comm_is = True
                self.command = self.text_words[0]

                if len(self.text_words) > 1:
                    try:
                        self.comm_params = shlex.split(self.text_cont)[1:]
                    except ValueError:
                        self.comm_params = self.text_words[1:]
dist.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _parse_command_opts(self, parser, args):
        # Remove --with-X/--without-X options when processing command args
        self.global_options = self.__class__.global_options
        self.negative_opt = self.__class__.negative_opt

        # First, expand any aliases
        command = args[0]
        aliases = self.get_option_dict('aliases')
        while command in aliases:
            src, alias = aliases[command]
            del aliases[command]  # ensure each alias can expand only once!
            import shlex
            args[:1] = shlex.split(alias, True)
            command = args[0]

        nargs = _Distribution._parse_command_opts(self, parser, args)

        # Handle commands that want to consume all remaining arguments
        cmd_class = self.get_command_class(command)
        if getattr(cmd_class, 'command_consumes_arguments', None):
            self.get_option_dict(command)['args'] = ("command line", nargs)
            if nargs is not None:
                return []

        return nargs
dist.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _parse_command_opts(self, parser, args):
        # Remove --with-X/--without-X options when processing command args
        self.global_options = self.__class__.global_options
        self.negative_opt = self.__class__.negative_opt

        # First, expand any aliases
        command = args[0]
        aliases = self.get_option_dict('aliases')
        while command in aliases:
            src, alias = aliases[command]
            del aliases[command]  # ensure each alias can expand only once!
            import shlex
            args[:1] = shlex.split(alias, True)
            command = args[0]

        nargs = _Distribution._parse_command_opts(self, parser, args)

        # Handle commands that want to consume all remaining arguments
        cmd_class = self.get_command_class(command)
        if getattr(cmd_class, 'command_consumes_arguments', None):
            self.get_option_dict(command)['args'] = ("command line", nargs)
            if nargs is not None:
                return []

        return nargs
io_helpers.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def plot(self):
        if _domainless._DEBUG == True:
            print "Plot:plot()"

        # Error checking before launching plot
        if self.usesPortIORString == None:
            raise AssertionError, "Plot:plot() ERROR - usesPortIORString not set ... must call connect() on this object from another component"
        if self._usesPortName == None:
            raise AssertionError, "Plot:plot() ERROR - usesPortName not set ... must call connect() on this object from another component"
        if self._dataType == None:
            raise AssertionError, "Plot:plot() ERROR - dataType not set ... must call connect() on this object from another component"

        plotCommand = str(self._eclipsePath) + "/bin/plotter.sh -portname " + str(self._usesPortName) + " -repid " + str(self._dataType) + " -ior " + str(self.usesPortIORString)
        if _domainless._DEBUG == True:
            print "Plot:plotCommand " + str(plotCommand)
        args = _shlex.split(plotCommand)
        if _domainless._DEBUG == True:
            print "Plot:args " + str(args)
        try:
            dev_null = open('/dev/null','w')
            sub_process = _subprocess.Popen(args,stdout=dev_null,preexec_fn=_os.setpgrp)
            pid = sub_process.pid
            self._processes[pid] = sub_process
        except Exception, e:
            raise AssertionError, "Plot:plot() Failed to launch plotting due to %s" % ( e)
pdb.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def precmd(self, line):
        """Handle alias expansion and ';;' separator."""
        if not line.strip():
            return line
        args = line.split()
        while args[0] in self.aliases:
            line = self.aliases[args[0]]
            ii = 1
            for tmpArg in args[1:]:
                line = line.replace("%" + str(ii),
                                      tmpArg)
                ii = ii + 1
            line = line.replace("%*", ' '.join(args[1:]))
            args = line.split()
        # split into ';;' separated commands
        # unless it's an alias command
        if args[0] != 'alias':
            marker = line.find(';;')
            if marker >= 0:
                # queue up everything after marker
                next = line[marker+2:].lstrip()
                self.cmdqueue.append(next)
                line = line[:marker].rstrip()
        return line
pdb.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def do_enable(self, arg):
        args = arg.split()
        for i in args:
            try:
                i = int(i)
            except ValueError:
                print >>self.stdout, 'Breakpoint index %r is not a number' % i
                continue

            if not (0 <= i < len(bdb.Breakpoint.bpbynumber)):
                print >>self.stdout, 'No breakpoint numbered', i
                continue

            bp = bdb.Breakpoint.bpbynumber[i]
            if bp:
                bp.enable()
pdb.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def do_disable(self, arg):
        args = arg.split()
        for i in args:
            try:
                i = int(i)
            except ValueError:
                print >>self.stdout, 'Breakpoint index %r is not a number' % i
                continue

            if not (0 <= i < len(bdb.Breakpoint.bpbynumber)):
                print >>self.stdout, 'No breakpoint numbered', i
                continue

            bp = bdb.Breakpoint.bpbynumber[i]
            if bp:
                bp.disable()
pdb.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def do_condition(self, arg):
        # arg is breakpoint number and condition
        args = arg.split(' ', 1)
        try:
            bpnum = int(args[0].strip())
        except ValueError:
            # something went wrong
            print >>self.stdout, \
                'Breakpoint index %r is not a number' % args[0]
            return
        try:
            cond = args[1]
        except:
            cond = None
        try:
            bp = bdb.Breakpoint.bpbynumber[bpnum]
        except IndexError:
            print >>self.stdout, 'Breakpoint index %r is not valid' % args[0]
            return
        if bp:
            bp.cond = cond
            if not cond:
                print >>self.stdout, 'Breakpoint', bpnum,
                print >>self.stdout, 'is now unconditional.'
webbrowser.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def _iscommand(cmd):
    """Return True if cmd is executable or can be found on the executable
    search path."""
    if _isexecutable(cmd):
        return True
    path = os.environ.get("PATH")
    if not path:
        return False
    for d in path.split(os.pathsep):
        exe = os.path.join(d, cmd)
        if _isexecutable(exe):
            return True
    return False


# General parent classes
pjf_process_monitor.py 文件源码 项目:PyJFuzz 作者: mseclab 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def save_testcase(self, testcase):
        """
        Save all testcases collected during monitoring
        """
        try:
            if self.config.debug:
                print("[\033[92mINFO\033[0m] Saving testcase...")
            dir_name = "testcase_{0}".format(os.path.basename(shlex.split(self.config.process_to_monitor)[0]))
            try:
                os.mkdir(dir_name)
            except OSError:
                pass
            for test in testcase:
                with open("{0}/testcase_{1}.json".format(dir_name, self.testcase_count), "wb") as t:
                    t.write(test)
                    t.close()
                self.testcase_count += 1
        except Exception as e:
            raise PJFBaseException(e.message if hasattr(e, "message") else str(e))
ecs.py 文件源码 项目:deployfish 作者: caltechads 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_helper_tasks(self):
        """
        Return a information about our helper tasks for this task definition.
        This is in the form of a dictionary like so:

            {`<helper_task_family>`: `<helper_task_family>:<revision>`, ...}

        If our service has helper tasks (as defined in the `tasks:` section of
        the deployfish.yml file), we've recorded the appropriate
        `<family>:<revision>` of each them as docker labels in the container
        definition of the first container in the task definition.

        Those docker labels will be in this form:

            edu.caltech.tasks.<task name>.id=<family>:<revision>

        :rtype: dict of strings
        """
        l = {}
        for key, value in self.dockerLabels.items():
            if key.startswith('edu.caltech.task'):
                l[value.split(':')[0]] = value
        return l
ecs.py 文件源码 项目:deployfish 作者: caltechads 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __get_volumes(self):
        """
        Generate the "volumes" argument for the ``register_task_definition()`` call.

        :rtype: dict
        """
        volume_names = set()
        volumes = []
        for c in self.containers:
            for v in c.volumes:
                host_path = v.split(':')[0]
                name = self.mount_from_host_path(host_path)
                if name not in volume_names:
                    volumes.append({
                        'name': name,
                        'host': {'sourcePath': host_path}
                    })
                    volume_names.add(name)
        return volumes
test_messier.py 文件源码 项目:messier 作者: conorsch 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_reload_vms(self):
        """
        Reboot VMs and check that uptime decreased.
        """
        m = messier.Messier()
        m.create_vms()

        def get_uptime(vm):
            """
            Return uptime in seconds for VM.
            """
            # Hideous one-liner, but it works.
            cmd = """vagrant ssh {} --command \"cut -d' ' -f 1 /proc/uptime\" """.format(vm.name)
            cmd = shlex.split(cmd)
            return subprocess.check_output(cmd, stderr=open('/dev/null', 'w'))

        # Sleep to make sure the original boot has a higher uptime
        time.sleep(10)
        original_uptimes = { vm.name: get_uptime(vm) for vm in m.vms }
        m.reload_vms()
        new_uptimes = { vm.name: get_uptime(vm) for vm in m.vms }

        for k, v in original_uptimes.iteritems():
            assert new_uptimes[k] < v
cephprocesses.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _process_map():
    """
    Create a map of processes that have deleted files.
    """
    procs = []
    proc1 = Popen(shlex.split('lsof '), stdout=PIPE)
    # pylint: disable=line-too-long
    proc2 = Popen(shlex.split("awk 'BEGIN {IGNORECASE = 1} /deleted/ {print $1 \" \" $2 \" \" $4}'"),
                  stdin=proc1.stdout, stdout=PIPE, stderr=PIPE)
    proc1.stdout.close()
    stdout, _ = proc2.communicate()
    for proc_l in stdout.split('\n'):
        proc = proc_l.split(' ')
        proc_info = {}
        if proc[0] and proc[1] and proc[2]:
            proc_info['name'] = proc[0]
            if proc_info['name'] == 'httpd-pre':
                # lsof 'nicely' abbreviates httpd-prefork to httpd-pre
                proc_info['name'] = 'httpd-prefork'
            proc_info['pid'] = proc[1]
            proc_info['user'] = proc[2]
            procs.append(proc_info)
        else:
            continue
    return procs
cephprocesses.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def zypper_ps(role, lsof_map):
    """
    Gets services that need a restart from zypper
    """
    assert role
    proc1 = Popen(shlex.split('zypper ps -sss'), stdout=PIPE)
    stdout, _ = proc1.communicate()
    processes_ = processes
    # adding instead of overwriting, eh?
    # radosgw is ceph-radosgw in zypper ps.
    processes_['rgw'] = ['ceph-radosgw', 'radosgw', 'rgw']
    # ganesha is called nfs-ganesha
    processes_['ganesha'] = ['ganesha.nfsd', 'rpcbind', 'rpc.statd', 'nfs-ganesha']
    for proc_l in stdout.split('\n'):
        if '@' in proc_l:
            proc_l = proc_l.split('@')[0]
        if proc_l in processes_[role]:
            lsof_map.append({'name': proc_l})
    return lsof_map
git.py 文件源码 项目:git-stacktrace 作者: pinterest 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def files_touched(git_range):
    """Run git log --pretty="%H" --raw  git_range.

    Generate a dictionary of files modified by the commits in range
    """
    cmd = 'git', 'log', '--pretty=%H', '--raw', git_range
    data = run_command(*cmd)
    commits = collections.defaultdict(list)
    commit = None
    for line in data.splitlines():
        if SHA1_REGEX.match(line):
            commit = line
        elif line.strip():
            split_line = line.split('\t')
            filename = split_line[-1]
            state = split_line[0].split(' ')[-1][0]
            commits[commit].append(GitFile(filename, state))
    return commits
dist.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def check_nsp(dist, attr, value):
    """Verify that namespace packages are valid"""
    assert_string_list(dist,attr,value)
    for nsp in value:
        if not dist.has_contents_for(nsp):
            raise DistutilsSetupError(
                "Distribution contains no modules or packages for " +
                "namespace package %r" % nsp
            )
        if '.' in nsp:
            parent = '.'.join(nsp.split('.')[:-1])
            if parent not in value:
                distutils.log.warn(
                    "WARNING: %r is declared as a package namespace, but %r"
                    " is not: please correct this in setup.py", nsp, parent
                )
dist.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _parse_command_opts(self, parser, args):
        # Remove --with-X/--without-X options when processing command args
        self.global_options = self.__class__.global_options
        self.negative_opt = self.__class__.negative_opt

        # First, expand any aliases
        command = args[0]
        aliases = self.get_option_dict('aliases')
        while command in aliases:
            src,alias = aliases[command]
            del aliases[command]    # ensure each alias can expand only once!
            import shlex
            args[:1] = shlex.split(alias,True)
            command = args[0]

        nargs = _Distribution._parse_command_opts(self, parser, args)

        # Handle commands that want to consume all remaining arguments
        cmd_class = self.get_command_class(command)
        if getattr(cmd_class,'command_consumes_arguments',None):
            self.get_option_dict(command)['args'] = ("command line", nargs)
            if nargs is not None:
                return []

        return nargs
bp_mmcif2dict.py 文件源码 项目:ssbio 作者: SBRG 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _tokenize(self, handle):
        for line in handle:
            if line.startswith("#"):
                continue
            elif line.startswith(";"):
                token = line[1:].strip()
                for line in handle:
                    line = line.strip()
                    if line == ';':
                        break
                    token += line
                yield token
            else:
                try:
                    tokens = shlex.split(line)
                except ValueError:
                    # error "No closing quotation"
                    line = line.replace("'",'"')
                    # if odd - add a closing " to that line
                    if not line.count('"') % 2 == 0:
                        line = '{}"'.format(line)
                    tokens = shlex.split(line)
                for token in tokens:
                    yield token
Scripting.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def update(ctx):
    lst = Options.options.files
    if lst:
        lst = lst.split(',')
    else:
        path = os.path.join(Context.waf_dir, 'waflib', 'extras')
        lst = [x for x in Utils.listdir(path) if x.endswith('.py')]
    for x in lst:
        tool = x.replace('.py', '')
        if not tool:
            continue
        try:
            dl = Configure.download_tool
        except AttributeError:
            ctx.fatal('The command "update" is dangerous; include the tool "use_config" in your project!')
        try:
            dl(tool, force=True, ctx=ctx)
        except Errors.WafError:
            Logs.error('Could not find the tool %r in the remote repository' % x)
        else:
            Logs.warn('Updated %r' % tool)
Scripting.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def update(ctx):
    lst = Options.options.files
    if lst:
        lst = lst.split(',')
    else:
        path = os.path.join(Context.waf_dir, 'waflib', 'extras')
        lst = [x for x in Utils.listdir(path) if x.endswith('.py')]
    for x in lst:
        tool = x.replace('.py', '')
        if not tool:
            continue
        try:
            dl = Configure.download_tool
        except AttributeError:
            ctx.fatal('The command "update" is dangerous; include the tool "use_config" in your project!')
        try:
            dl(tool, force=True, ctx=ctx)
        except Errors.WafError:
            Logs.error('Could not find the tool %r in the remote repository' % x)
        else:
            Logs.warn('Updated %r' % tool)
Configure.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def add_os_flags(self, var, dest=None, dup=True):
    """
    Import operating system environment values into ``conf.env`` dict::

        def configure(conf):
            conf.add_os_flags('CFLAGS')

    :param var: variable to use
    :type var: string
    :param dest: destination variable, by default the same as var
    :type dest: string
    :param dup: add the same set of flags again
    :type dup: bool
    """
    try:
        flags = shlex.split(self.environ[var])
    except KeyError:
        return
    # TODO: in waf 1.9, make dup=False the default
    if dup or ''.join(flags) not in ''.join(Utils.to_list(self.env[dest or var])):
        self.env.append_value(dest or var, flags)
Scripting.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def update(ctx):
    lst = Options.options.files
    if lst:
        lst = lst.split(',')
    else:
        path = os.path.join(Context.waf_dir, 'waflib', 'extras')
        lst = [x for x in Utils.listdir(path) if x.endswith('.py')]
    for x in lst:
        tool = x.replace('.py', '')
        if not tool:
            continue
        try:
            dl = Configure.download_tool
        except AttributeError:
            ctx.fatal('The command "update" is dangerous; include the tool "use_config" in your project!')
        try:
            dl(tool, force=True, ctx=ctx)
        except Errors.WafError:
            Logs.error('Could not find the tool %r in the remote repository' % x)
        else:
            Logs.warn('Updated %r' % tool)
Configure.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def add_os_flags(self, var, dest=None, dup=True):
    """
    Import operating system environment values into ``conf.env`` dict::

        def configure(conf):
            conf.add_os_flags('CFLAGS')

    :param var: variable to use
    :type var: string
    :param dest: destination variable, by default the same as var
    :type dest: string
    :param dup: add the same set of flags again
    :type dup: bool
    """
    try:
        flags = shlex.split(self.environ[var])
    except KeyError:
        return
    # TODO: in waf 1.9, make dup=False the default
    if dup or ''.join(flags) not in ''.join(Utils.to_list(self.env[dest or var])):
        self.env.append_value(dest or var, flags)
jobserver.py 文件源码 项目:react-tornado-graphql-example 作者: yatsu 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def countdown_handler(self, interval, count):
        command = '{0}/countdown -i {1} {2}'.format(os.getcwd(), interval, count)
        proc = Subprocess(shlex.split(command), stdout=Subprocess.STREAM)
        try:
            while True:
                line_bytes = yield proc.stdout.read_until(b'\n')
                line = to_unicode(line_bytes)[:-1]
                self.log.info('command read: %s', line)
                timestamp = datetime.now().timestamp()
                self.zmq_stream.send_multipart([b'0', utf8(json_encode({
                    'stdout': line,
                    'finished': False,
                    'timestamp': timestamp
                }))])
        except StreamClosedError:
            self.log.info('command closed')
            timestamp = datetime.now().timestamp()
            self.zmq_stream.send_multipart([b'0', utf8(json_encode({
                'stdout': None,
                'finished': True,
                'timestamp': timestamp
            }))])
build_doc.py 文件源码 项目:python-everywhere 作者: wdv4758h 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run_cmd(cmd, quiet=False):
    if not quiet:
        logging.info('command: {}'.format(cmd))

    # use shlex to keep quoted substrings
    result = run(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
    stdout = result.stdout.strip().decode()
    stderr = result.stderr.strip().decode()

    if stdout and not quiet:
        logging.debug(stdout)

    if stderr and not quiet:
        logging.warning(stderr)

    return result.stdout.strip()


问题


面经


文章

微信
公众号

扫码关注公众号