python类quote()的实例源码

cfg_cross_gnu.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def xcheck_envar(conf, name, wafname=None, cross=False):
    wafname = wafname or name
    envar = os.environ.get(name, None)

    if envar is None:
        return

    value = Utils.to_list(envar) if envar != '' else [envar]

    conf.env[wafname] = value
    if cross:
        pretty = 'cross-compilation %s' % wafname
    else:
        pretty = wafname
    conf.msg('Will use %s' % pretty,
     " ".join(quote(x) for x in value))
cfg_cross_gnu.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def xcheck_envar(conf, name, wafname=None, cross=False):
    wafname = wafname or name
    envar = os.environ.get(name, None)

    if envar is None:
        return

    value = Utils.to_list(envar) if envar != '' else [envar]

    conf.env[wafname] = value
    if cross:
        pretty = 'cross-compilation %s' % wafname
    else:
        pretty = wafname
    conf.msg('Will use %s' % pretty,
     " ".join(quote(x) for x in value))
utils.py 文件源码 项目:cheribuild 作者: CTSRD-CHERI 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def printCommand(arg1: "typing.Union[str, typing.Sequence[typing.Any]]", *remainingArgs, outputFile=None,
                 colour=AnsiColour.yellow, cwd=None, env=None, sep=" ", printVerboseOnly=False, **kwargs):
    if not _cheriConfig or (_cheriConfig.quiet or (printVerboseOnly and not _cheriConfig.verbose)):
        return
    # also allow passing a single string
    if not type(arg1) is str:
        allArgs = arg1
        arg1 = allArgs[0]
        remainingArgs = allArgs[1:]
    newArgs = ("cd", shlex.quote(str(cwd)), "&&") if cwd else tuple()
    if env:
        # only print the changed environment entries
        filteredEnv = __filterEnv(env)
        if filteredEnv:
            newArgs += ("env",) + tuple(map(shlex.quote, (k + "=" + str(v) for k, v in filteredEnv.items())))
    # comma in tuple is required otherwise it creates a tuple of string chars
    newArgs += (shlex.quote(str(arg1)),) + tuple(map(shlex.quote, map(str, remainingArgs)))
    if outputFile:
        newArgs += (">", str(outputFile))
    print(coloured(colour, newArgs, sep=sep), flush=True, **kwargs)
__init__.py 文件源码 项目:bpy_lambda 作者: bcongdon 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def process_pre(self):
        import tempfile

        self.temp_dir = tempfile.TemporaryDirectory()

        self.environ = {'PYTHONPATH': pythonpath()}
        self.outfname = bpy.path.ensure_ext(self.filepath, ".zip")
        self.command = (
            bpy.app.binary_path_python,
            '-m', 'bam.pack',
            # file to pack
            "--input", bpy.data.filepath,
            # file to write
            "--output", self.outfname,
            "--temp", self.temp_dir.name,
        )

        if self.log.isEnabledFor(logging.INFO):
            import shlex
            cmd_to_log = ' '.join(shlex.quote(s) for s in self.command)
            self.log.info('Executing %s', cmd_to_log)
spec.py 文件源码 项目:badwolf 作者: bosondata 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def shell_script(self):
        def _trace(command):
            return 'echo + {}\n{} '.format(
                shlex.quote(command),
                command
            )

        commands = []
        after_success = [_trace(cmd) for cmd in self.after_success]
        after_failure = [_trace(cmd) for cmd in self.after_failure]
        for service in self.services:
            commands.append(_trace('service {} start'.format(service)))
        for script in self.scripts:
            commands.append(_trace(script))

        command_encoded = shlex.quote(to_text(base64.b64encode(to_binary('\n'.join(commands)))))
        context = {
            'command': command_encoded,
            'after_success': '    \n'.join(after_success),
            'after_failure': '    \n'.join(after_failure),
        }
        script = render_template('script.sh', **context)
        logger.debug('Build script: \n%s', script)
        return script
unarchive.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def files_in_archive(self, force_refresh=False):
        if self._files_in_archive and not force_refresh:
            return self._files_in_archive

        cmd = [ self.cmd_path, '--list', '-C', self.dest ]
        if self.zipflag:
            cmd.append(self.zipflag)
        if self.opts:
            cmd.extend([ '--show-transformed-names' ] + self.opts)
        if self.excludes:
            cmd.extend([ '--exclude=' + quote(f) for f in self.excludes ])
        cmd.extend([ '-f', self.src ])
        rc, out, err = self.module.run_command(cmd, cwd=self.dest, environ_update=dict(LANG='C', LC_ALL='C', LC_MESSAGES='C'))
        if rc != 0:
            raise UnarchiveError('Unable to list files in the archive')

        for filename in out.splitlines():
            # Compensate for locale-related problems in gtar output (octal unicode representation) #11348
#            filename = filename.decode('string_escape')
            filename = codecs.escape_decode(filename)[0]
            if filename and filename not in self.excludes:
                self._files_in_archive.append(to_native(filename))
        return self._files_in_archive
unarchive.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def unarchive(self):
        cmd = [ self.cmd_path, '--extract', '-C', self.dest ]
        if self.zipflag:
            cmd.append(self.zipflag)
        if self.opts:
            cmd.extend([ '--show-transformed-names' ] + self.opts)
        if self.file_args['owner']:
            cmd.append('--owner=' + quote(self.file_args['owner']))
        if self.file_args['group']:
            cmd.append('--group=' + quote(self.file_args['group']))
        if self.module.params['keep_newer']:
            cmd.append('--keep-newer-files')
        if self.excludes:
            cmd.extend([ '--exclude=' + quote(f) for f in self.excludes ])
        cmd.extend([ '-f', self.src ])
        rc, out, err = self.module.run_command(cmd, cwd=self.dest, environ_update=dict(LANG='C', LC_ALL='C', LC_MESSAGES='C'))
        return dict(cmd=cmd, rc=rc, out=out, err=err)
test_magic.py 文件源码 项目:leetcode 作者: thomasyimgit 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_alias_magic():
    """Test %alias_magic."""
    ip = get_ipython()
    mm = ip.magics_manager

    # Basic operation: both cell and line magics are created, if possible.
    ip.run_line_magic('alias_magic', 'timeit_alias timeit')
    nt.assert_in('timeit_alias', mm.magics['line'])
    nt.assert_in('timeit_alias', mm.magics['cell'])

    # --cell is specified, line magic not created.
    ip.run_line_magic('alias_magic', '--cell timeit_cell_alias timeit')
    nt.assert_not_in('timeit_cell_alias', mm.magics['line'])
    nt.assert_in('timeit_cell_alias', mm.magics['cell'])

    # Test that line alias is created successfully.
    ip.run_line_magic('alias_magic', '--line env_alias env')
    nt.assert_equal(ip.run_line_magic('env', ''),
                    ip.run_line_magic('env_alias', ''))

    # Test that line alias with parameters passed in is created successfully.
    ip.run_line_magic('alias_magic', '--line history_alias history --params ' + shlex.quote('3'))
    nt.assert_in('history_alias', mm.magics['line'])
utils.py 文件源码 项目:unionfs_cleaner 作者: l3uddz 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def opened_files(path, excludes):
    files = []

    try:
        process = os.popen('lsof -wFn +D %s | tail -n +2 | cut -c2-' % cmd_quote(path))
        data = process.read()
        process.close()
        for item in data.split('\n'):
            if not item or len(item) <= 2 or os.path.isdir(item) or item.isdigit() or file_excluded(item, excludes):
                continue
            files.append(item)

        return files

    except Exception as ex:
        logger.exception("Exception checking %r: ", path)
        return None
utils.py 文件源码 项目:unionfs_cleaner 作者: l3uddz 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def rclone_move_command(local, remote, transfers, checkers, bwlimit, excludes, chunk_size, dry_run):
    upload_cmd = 'rclone move %s %s' \
                 ' --delete-after' \
                 ' --no-traverse' \
                 ' --stats=60s' \
                 ' -v' \
                 ' --transfers=%d' \
                 ' --checkers=%d' \
                 ' --drive-chunk-size=%s' % \
                 (cmd_quote(local), cmd_quote(remote), transfers, checkers, chunk_size)
    if bwlimit and len(bwlimit):
        upload_cmd += ' --bwlimit="%s"' % bwlimit
    for item in excludes:
        upload_cmd += ' --exclude="%s"' % item
    if dry_run:
        upload_cmd += ' --dry-run'
    return upload_cmd
utils.py 文件源码 项目:unionfs_cleaner 作者: l3uddz 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def remove_empty_directories(config, force_dry_run=False):
    open_files = opened_files(config['local_folder'], config['lsof_excludes'])
    if not len(open_files):
        clearing = False
        for dir, depth in config['rclone_remove_empty_on_upload'].items():
            if os.path.exists(dir):
                clearing = True
                logger.debug("Removing empty directories from %r with mindepth %r", dir, depth)
                cmd = 'find %s -mindepth %d -type d -empty' % (cmd_quote(dir), depth)
                if not config['dry_run'] and not force_dry_run:
                    cmd += ' -delete'
                run_command(cmd)
        if clearing:
            logger.debug("Finished clearing empty directories")
    else:
        logger.debug("Skipped removing empty directories because %d files are currently open: %r", len(open_files),
                     open_files)


############################################################
# CONFIG STUFF
############################################################
test_activate.py 文件源码 项目:kapsel 作者: conda 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_activate(monkeypatch):
    can_connect_args = _monkeypatch_can_connect_to_socket_to_succeed(monkeypatch)

    def activate_redis_url(dirname):
        project_dir_disable_dedicated_env(dirname)
        result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None)
        assert can_connect_args['port'] == 6379
        assert result is not None
        if platform.system() == 'Windows':
            result = [line for line in result if not line.startswith("export PATH")]
            print("activate changed PATH on Windows and ideally it would not.")
        if len(result) > 2:
            import os
            print("os.environ=" + repr(os.environ))
            print("result=" + repr(result))
        assert ['export PROJECT_DIR=' + quote(dirname), 'export REDIS_URL=redis://localhost:6379'] == result

    with_directory_contents_completing_project_file(
        {DEFAULT_PROJECT_FILENAME: """
services:
  REDIS_URL: redis
    """}, activate_redis_url)
test_activate.py 文件源码 项目:kapsel 作者: conda 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_activate_quoting(monkeypatch):
    def activate_foo(dirname):
        project_dir_disable_dedicated_env(dirname)
        result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None)
        assert result is not None
        if platform.system() == 'Windows':
            result = [line for line in result if not line.startswith("export PATH")]
            print("activate changed PATH on Windows and ideally it would not.")
        assert ["export FOO='$! boo'", 'export PROJECT_DIR=' + quote(dirname)] == result

    with_directory_contents_completing_project_file(
        {
            DEFAULT_PROJECT_FILENAME: """
variables:
  FOO: {}
    """,
            DEFAULT_LOCAL_STATE_FILENAME: """
variables:
  FOO: $! boo
"""
        }, activate_foo)
shell_ast.py 文件源码 项目:ml-utils 作者: LinxiFan 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _shell_replace_vars(cmd, local_vars):
    spans = []
    replacements = []
    for match in _dollar_var_re.finditer(cmd):
        varname = match.group(1)
        if varname.isdigit():
            # $1, $2 for sys.argv, just like bash
            value = sys.argv[int(varname)]
        elif varname == '#':
            value = len(sys.argv) - 1
        elif varname == '@':
            value = ' '.join(map(shlex.quote, sys.argv[1:]))
        else:
            assert is_variable_name(varname), 'not a valid var name: ' + varname
            if not varname in local_vars:
                continue
            value = local_vars[varname]

        if isinstance(value, str):
            value = shlex.quote(value)
        else:
            value = str(value)
        spans.append(match.span())
        replacements.append(value)
    return _replace_n(cmd, spans, replacements)
tui.py 文件源码 项目:stig 作者: rndusr 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run(self, context, KEY, ACTION):
        keymap = self.tui.keymap

        key = KEY
        if len(ACTION) == 1 and ACTION[0][0] == '<' and ACTION[0][-1] == '>':
            # ACTION is another key (e.g. 'j' -> 'down')
            action = keymap.mkkey(ACTION[0])
        else:
            action = ' '.join(shlex.quote(x) for x in ACTION)

        if context is None:
            from ...tui.keymap import DEFAULT_CONTEXT
            context = DEFAULT_CONTEXT
        elif context not in _get_KEYMAP_CONTEXTS():
            log.error('Invalid context: {!r}'.format(context))
            return False

        try:
            keymap.bind(key, action, context=context)
        except ValueError as e:
            log.error(e)
            return False
        else:
            return True
compile.py 文件源码 项目:performance 作者: python 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_output_nocheck(self, *cmd, **kwargs):
        proc = self.create_subprocess(cmd,
                                      stdout=subprocess.PIPE,
                                      universal_newlines=True,
                                      **kwargs)
        # FIXME: support Python 2?
        with proc:
            stdout = proc.communicate()[0]

        stdout = stdout.rstrip()

        exitcode = proc.wait()
        if exitcode:
            cmd_str = ' '.join(map(shlex.quote, cmd))
            self.logger.error("Command %s failed with exit code %s"
                              % (cmd_str, exitcode))

        return (exitcode, stdout)
player.py 文件源码 项目:ddmbot 作者: Budovi 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _spawn_ffmpeg(self):
        if self.streaming:
            url = self._stream_url
        elif self.playing:
            url = self._song_context.song_url
        else:
            raise RuntimeError('Player is in an invalid state')

        args = shlex.split(self._ffmpeg_command.format(shlex.quote(url)))
        try:
            self._ffmpeg = subprocess.Popen(args)
        except FileNotFoundError as e:
            raise RuntimeError('ffmpeg executable was not found') from e
        except subprocess.SubprocessError as e:
            raise RuntimeError('Popen failed: {0.__name__} {1}'.format(type(e), str(e))) from e

    #
    # Player FSM
    #
test_shlex.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testQuote(self):
        safeunquoted = string.ascii_letters + string.digits + '@%_-+=:,./'
        unicode_sample = '\xe9\xe0\xdf'  # e + acute accent, a + grave, sharp s
        unsafe = '"`$\\!' + unicode_sample

        self.assertEqual(shlex.quote(''), "''")
        self.assertEqual(shlex.quote(safeunquoted), safeunquoted)
        self.assertEqual(shlex.quote('test file name'), "'test file name'")
        for u in unsafe:
            self.assertEqual(shlex.quote('test%sname' % u),
                             "'test%sname'" % u)
        for u in unsafe:
            self.assertEqual(shlex.quote("test%s'name'" % u),
                             "'test%s'\"'\"'name'\"'\"''" % u)

# Allow this test to be used with old shlex.py
dvmbased.py 文件源码 项目:qubes-incremental-backup-poc 作者: v6ak 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def restore_vm(self, new_vm, new_name, size, qvm_create_args, vm_keys, backup_storage_vm):
        subprocess.check_call("qvm-create "+shlex.quote(new_name)+" "+qvm_create_args, shell=True)
        subprocess.check_call(["qvm-prefs", "-s", new_name, "netvm", "none"]) # Safe approach…
        if size is not None:
            subprocess.check_call(["qvm-grow-private", new_name, size])
        with Dvm() as dvm:
            dvm.attach("xvdz", new_vm.private_volume())
            try:
                if size is not None:
                    dvm.check_call("sudo e2fsck -f -p /dev/xvdz")
                    dvm.check_call("sudo resize2fs /dev/xvdz")
                dvm.check_call("sudo mkdir /mnt/clone")
                dvm.check_call("sudo mount /dev/xvdz /mnt/clone")
                try:
                    self.upload_agent(dvm)
                    with self.add_permissions(backup_storage_vm, dvm, vm_keys.encrypted_name):
                        dvm.check_call("/tmp/restore-agent "+shlex.quote(backup_storage_vm.get_name())+" "+shlex.quote(vm_keys.encrypted_name), input = vm_keys.key, stdout = None, stderr = None)
                finally: dvm.check_call("sudo umount /mnt/clone")
            finally: dvm.detach_all()

    # abstract def upload_agent(self, dvm)
main.py 文件源码 项目:borg-import 作者: borgbackup 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():
    if not shutil.which('borg'):
        print('The \'borg\' command can\'t be found in the PATH. Please correctly install borgbackup first.')
        print('See instructions at https://borgbackup.readthedocs.io/en/stable/installation.html')
        return 1

    parser = build_parser()
    args = parser.parse_args()
    logging.basicConfig(level=args.log_level, format='%(message)s')

    if 'function' not in args:
        return parser.print_help()
    try:
        return args.function(args)
    except subprocess.CalledProcessError as cpe:
        print('{} invocation failed with status {}'.format(cpe.cmd[0], cpe.returncode))
        print('Command line was:', *[shlex.quote(s) for s in cpe.cmd])
        return cpe.returncode
ceph_version.py 文件源码 项目:ceph-lcm 作者: Mirantis 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_package_version(prefix, connection, package_name):
    command = "dpkg-query --showformat='${Version}' --show %s" % shlex.quote(
        package_name)
    result = await connection.run(command)

    if result.exit_status != os.EX_OK:
        click.echo(
            "{0}package (failed {1}): {2} - {3}".format(
                prefix, result.exit_status, package_name, result.stderr.strip()
            )
        )
    else:
        click.echo(
            "{0}package (ok): {1}=={2}".format(
                prefix, package_name, result.stdout.strip()
            )
        )
health.py 文件源码 项目:ceph-lcm 作者: Mirantis 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(self):
        data = cluster_data.ClusterData.find_one(self.cluster.model_id)
        cluster_name = data.global_vars.get("cluster", self.cluster.name)
        cluster_name = shlex.quote(cluster_name)

        cluster_servers = {item._id: item for item in self.cluster.server_list}
        mons = [
            cluster_servers[item["server_id"]]
            for item in self.cluster.configuration.state
            if item["role"] == "mons"]
        if not mons:
            return

        version_result = await self.execute_cmd(
            "ceph --cluster {0} health --format json".format(cluster_name),
            random.choice(mons))

        self.manage_errors(
            "Cannot execute ceph health command on %s (%s): %s",
            "Not all hosts have working ceph command",
            version_result.errors
        )
        self.manage_health(version_result)
running_version.py 文件源码 项目:ceph-lcm 作者: Mirantis 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run(self):
        data = cluster_data.ClusterData.find_one(self.cluster.model_id)
        cluster_name = data.global_vars.get("cluster", self.cluster.name)
        cluster_name = shlex.quote(cluster_name)

        version_result = await self.execute_cmd(
            "ceph --cluster {0} version".format(cluster_name),
            *self.cluster.server_list)

        self.manage_errors(
            "Cannot execute ceph version command on %s (%s): %s",
            "Not all hosts have working ceph command",
            version_result.errors
        )

        results = list(parse_results(version_result.ok))
        self.manage_versions(results)
        self.manage_commits(results)
dialog.py 文件源码 项目:SLAC 作者: bencbartlett 项目源码 文件源码 阅读 54 收藏 0 点赞 0 评论 0
def _write_command_to_file(self, env, arglist):
        envvar_settings_list = []

        if "DIALOGRC" in env:
            envvar_settings_list.append(
                "DIALOGRC={0}".format(_shell_quote(env["DIALOGRC"])))

        for var in self._lowlevel_exit_code_varnames:
            varname = "DIALOG_" + var
            envvar_settings_list.append(
                "{0}={1}".format(varname, _shell_quote(env[varname])))

        command_str = ' '.join(envvar_settings_list +
                               list(imap(_shell_quote, arglist)))
        s = "{separator}{cmd}\n\nArgs: {args!r}\n".format(
            separator="" if self._debug_first_output else ("-" * 79) + "\n",
            cmd=command_str, args=arglist)

        self._debug_logfile.write(s)
        if self._debug_always_flush:
            self._debug_logfile.flush()

        self._debug_first_output = False
eval.py 文件源码 项目:plash 作者: ihucos 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def action(action_name=None, keep_comments=False, escape=True):
    def decorator(func):

        action = action_name or func.__name__.replace('_', '-')

        def function_wrapper(*args, **kw):

            if not keep_comments:
                args = [i for i in args if not i.startswith('#')]

            if escape:
                args = [shlex.quote(i) for i in args]

            res = func(*args, **kw)

            # allow actions to yield each line
            if not isinstance(res, str) and res is not None:
                res = '\n'.join(res)

            return res

        state['actions'][action] = function_wrapper
        return function_wrapper

    return decorator
solution.py 文件源码 项目:ctf-write-ups 作者: mvisat 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def brute(i):
  global flag
  global last_breakpoint
  for c in charset:
    flag[i] = c
    output = gdb.execute('r < <(echo {})'.format(shlex.quote(''.join(flag))), True, True)
    # skip floating point exception
    while "SIGFPE" in output:
      output = gdb.execute('c', True, True)

    output = gdb.execute('x $pc', True, True)
    pc = output.split(":")[0]
    pc = int(pc, 16)
    if pc > last_breakpoint:
      last_breakpoint = pc
      break
  print(''.join(flag))
test_shlex.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testQuote(self):
        safeunquoted = string.ascii_letters + string.digits + '@%_-+=:,./'
        unicode_sample = '\xe9\xe0\xdf'  # e + acute accent, a + grave, sharp s
        unsafe = '"`$\\!' + unicode_sample

        self.assertEqual(shlex.quote(''), "''")
        self.assertEqual(shlex.quote(safeunquoted), safeunquoted)
        self.assertEqual(shlex.quote('test file name'), "'test file name'")
        for u in unsafe:
            self.assertEqual(shlex.quote('test%sname' % u),
                             "'test%sname'" % u)
        for u in unsafe:
            self.assertEqual(shlex.quote("test%s'name'" % u),
                             "'test%s'\"'\"'name'\"'\"''" % u)

# Allow this test to be used with old shlex.py
executors.py 文件源码 项目:Slivka 作者: warownia1 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def submit(self, values, cwd):
        queue_command = [
            'qsub', '-cwd', '-e', 'stderr.txt', '-o', 'stdout.txt'
        ] + self.qargs
        process = subprocess.Popen(
            queue_command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            stdin=subprocess.PIPE,
            cwd=cwd,
            env=self.env,
            universal_newlines=True
        )
        command_chunks = self.bin + self.get_options(values)
        command = ' '.join(shlex.quote(s) for s in command_chunks)
        stdin = ("echo > started;\n"
                 "%s;\n"
                 "echo > finished;") % command
        stdout, stderr = process.communicate(stdin)
        match = self.job_submission_regex.match(stdout)
        return match.group(1)
lftpbackend.py 文件源码 项目:alicloud-duplicity 作者: aliyun 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _list(self):
        # Do a long listing to avoid connection reset
        # remote_dir = urllib.unquote(self.parsed_url.path.lstrip('/')).rstrip()
        remote_dir = urllib.unquote(self.parsed_url.path)
        # print remote_dir
        quoted_path = cmd_quote(self.remote_path)
        # failing to cd into the folder might be because it was not created already
        commandline = "lftp -c \"source %s; ( cd %s && ls ) || ( mkdir -p %s && cd %s && ls )\"" % (
            cmd_quote(self.tempname),
            quoted_path, quoted_path, quoted_path
        )
        log.Debug("CMD: %s" % commandline)
        _, l, e = self.subprocess_popen(commandline)
        log.Debug("STDERR:\n"
                  "%s" % (e))
        log.Debug("STDOUT:\n"
                  "%s" % (l))

        # Look for our files as the last element of a long list line
        return [x.split()[-1] for x in l.split('\n') if x]
rm_duplicates.py 文件源码 项目:tools 作者: freedict 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def rm_doubled_senses(entry):
    """Some entries have multiple senses. A few of them are exactly the same,
    remove these.
    This function returns True if an element has been altered"""
    senses = list(findall(entry, 'sense'))
    if len(senses) == 1:
        return
    # obtain a mapping from XML node -> list of words within `<quote>…</quote>`
    senses = {sense: tuple(q.text.strip() for q in tei_iter(sense, 'quote')
            if q.text) for sense in senses}
    changed = False
    # pair each sense with another and compare their content
    for s1, s2 in itertools.combinations(senses.items(), 2):
        if len(s1[1]) == len(s2[1]):
            # if two senses are *excactly* identical
            if all(e1 == e2 for e1, e2 in zip(s1[1], s2[1])):
                try:
                    entry.remove(s2[0]) # sense node object
                    changed = True
                except ValueError: # already removed?
                    pass
    return changed


问题


面经


文章

微信
公众号

扫码关注公众号