python类quote()的实例源码

utils.py 文件源码 项目:unionfs_cleaner 作者: l3uddz 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def rclone_move_command(local, remote, transfers, checkers, bwlimit, excludes, chunk_size, dry_run):
    upload_cmd = 'rclone move %s %s' \
                 ' --delete-after' \
                 ' --no-traverse' \
                 ' --stats=60s' \
                 ' -v' \
                 ' --transfers=%d' \
                 ' --checkers=%d' \
                 ' --drive-chunk-size=%s' % \
                 (cmd_quote(local), cmd_quote(remote), transfers, checkers, chunk_size)
    if bwlimit and len(bwlimit):
        upload_cmd += ' --bwlimit="%s"' % bwlimit
    for item in excludes:
        upload_cmd += ' --exclude="%s"' % item
    if dry_run:
        upload_cmd += ' --dry-run'
    return upload_cmd
utils.py 文件源码 项目:unionfs_cleaner 作者: l3uddz 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def remove_empty_directories(config, force_dry_run=False):
    open_files = opened_files(config['local_folder'], config['lsof_excludes'])
    if not len(open_files):
        clearing = False
        for dir, depth in config['rclone_remove_empty_on_upload'].items():
            if os.path.exists(dir):
                clearing = True
                logger.debug("Removing empty directories from %r with mindepth %r", dir, depth)
                cmd = 'find %s -mindepth %d -type d -empty' % (cmd_quote(dir), depth)
                if not config['dry_run'] and not force_dry_run:
                    cmd += ' -delete'
                run_command(cmd)
        if clearing:
            logger.debug("Finished clearing empty directories")
    else:
        logger.debug("Skipped removing empty directories because %d files are currently open: %r", len(open_files),
                     open_files)


############################################################
# CONFIG STUFF
############################################################
test_activate.py 文件源码 项目:kapsel 作者: conda 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_activate(monkeypatch):
    can_connect_args = _monkeypatch_can_connect_to_socket_to_succeed(monkeypatch)

    def activate_redis_url(dirname):
        project_dir_disable_dedicated_env(dirname)
        result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None)
        assert can_connect_args['port'] == 6379
        assert result is not None
        if platform.system() == 'Windows':
            result = [line for line in result if not line.startswith("export PATH")]
            print("activate changed PATH on Windows and ideally it would not.")
        if len(result) > 2:
            import os
            print("os.environ=" + repr(os.environ))
            print("result=" + repr(result))
        assert ['export PROJECT_DIR=' + quote(dirname), 'export REDIS_URL=redis://localhost:6379'] == result

    with_directory_contents_completing_project_file(
        {DEFAULT_PROJECT_FILENAME: """
services:
  REDIS_URL: redis
    """}, activate_redis_url)
test_activate.py 文件源码 项目:kapsel 作者: conda 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_activate_quoting(monkeypatch):
    def activate_foo(dirname):
        project_dir_disable_dedicated_env(dirname)
        result = activate(dirname, UI_MODE_TEXT_ASSUME_YES_DEVELOPMENT, conda_environment=None, command_name=None)
        assert result is not None
        if platform.system() == 'Windows':
            result = [line for line in result if not line.startswith("export PATH")]
            print("activate changed PATH on Windows and ideally it would not.")
        assert ["export FOO='$! boo'", 'export PROJECT_DIR=' + quote(dirname)] == result

    with_directory_contents_completing_project_file(
        {
            DEFAULT_PROJECT_FILENAME: """
variables:
  FOO: {}
    """,
            DEFAULT_LOCAL_STATE_FILENAME: """
variables:
  FOO: $! boo
"""
        }, activate_foo)
activate.py 文件源码 项目:kapsel 作者: conda 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def activate(dirname, ui_mode, conda_environment, command_name):
    """Prepare project and return lines to be sourced.

    Future direction: should also activate the proper conda env.

    Returns:
        None on failure or a list of lines to print.
    """
    project = load_project(dirname)
    result = prepare_with_ui_mode_printing_errors(project,
                                                  ui_mode=ui_mode,
                                                  env_spec_name=conda_environment,
                                                  command_name=command_name)
    if result.failed:
        return None

    exports = []
    # sort so we have deterministic output order for tests
    sorted_keys = list(result.environ.keys())
    sorted_keys.sort()
    for key in sorted_keys:
        value = result.environ[key]
        if key not in os.environ or os.environ[key] != value:
            exports.append("export {key}={value}".format(key=key, value=quote(value)))
    return exports
common.py 文件源码 项目:kodi-plugin.video.ted-talks-chinese 作者: daineseh 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _debug_cmd(self, args, exe=None):
        if not self.params.get('verbose', False):
            return

        str_args = [decodeArgument(a) for a in args]

        if exe is None:
            exe = os.path.basename(str_args[0])

        try:
            import pipes
            shell_quote = lambda args: ' '.join(map(pipes.quote, str_args))
        except ImportError:
            shell_quote = repr
        self.to_screen('[debug] %s command line: %s' % (
            exe, shell_quote(str_args)))
festival.py 文件源码 项目:jessy 作者: jessy-project 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def is_available(cls):
        if (super(cls, cls).is_available() and
           diagnose.check_executable('text2wave') and
           diagnose.check_executable('festival')):

            logger = logging.getLogger(__name__)
            cmd = ['festival', '--pipe']
            with tempfile.SpooledTemporaryFile() as out_f:
                with tempfile.SpooledTemporaryFile() as in_f:
                    logger.debug('Executing %s', ' '.join([pipes.quote(arg)
                                                           for arg in cmd]))
                    subprocess.call(cmd, stdin=in_f, stdout=out_f,
                                    stderr=out_f)
                    out_f.seek(0)
                    output = out_f.read().strip()
                    if output:
                        logger.debug("Output was: '%s'", output)
                    return ('No default voice found' not in output)
        return False
festival.py 文件源码 项目:jessy 作者: jessy-project 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def say(self, phrase, *args):
        self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
        cmd = ['text2wave']
        with tempfile.NamedTemporaryFile(suffix='.wav') as out_f:
            with tempfile.SpooledTemporaryFile() as in_f:
                in_f.write(phrase)
                in_f.seek(0)
                with tempfile.SpooledTemporaryFile() as err_f:
                    self._logger.debug('Executing %s',
                                       ' '.join([pipes.quote(arg)
                                                 for arg in cmd]))
                    subprocess.call(cmd, stdin=in_f, stdout=out_f,
                                    stderr=err_f)
                    err_f.seek(0)
                    output = err_f.read()
                    if output:
                        self._logger.debug("Output was: '%s'", output)
            self.play(out_f.name)
flite.py 文件源码 项目:jessy 作者: jessy-project 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def say(self, phrase, *args):
        self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
        cmd = ['flite']
        if self.voice:
            cmd.extend(['-voice', self.voice])
        cmd.extend(['-t', phrase])
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
            fname = f.name
        cmd.append(fname)
        with tempfile.SpooledTemporaryFile() as out_f:
            self._logger.debug('Executing %s',
                               ' '.join([pipes.quote(arg)
                                         for arg in cmd]))
            subprocess.call(cmd, stdout=out_f, stderr=out_f)
            out_f.seek(0)
            output = out_f.read().strip()
        if output:
            self._logger.debug("Output was: '%s'", output)
        self.play(fname)
        os.remove(fname)
espeak.py 文件源码 项目:jessy 作者: jessy-project 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def say(self, phrase, *args):
        self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
            fname = f.name
        cmd = ['espeak', '-v', self.voice,
                         '-p', self.pitch_adjustment,
                         '-s', self.words_per_minute,
                         '-w', fname,
                         phrase]
        cmd = [str(x) for x in cmd]
        self._logger.debug('Executing %s', ' '.join([pipes.quote(arg)
                                                     for arg in cmd]))
        with tempfile.TemporaryFile() as f:
            subprocess.call(cmd, stdout=f, stderr=f)
            f.seek(0)
            output = f.read()
            if output:
                self._logger.debug("Output was: '%s'", output)
        self.play(fname)
        os.remove(fname)
pico.py 文件源码 项目:jessy 作者: jessy-project 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def say(self, phrase, *args):
        self._logger.debug("Saying '%s' with '%s'", phrase, self.SLUG)
        with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
            fname = f.name
        cmd = ['pico2wave', '--wave', fname]
        if self.language not in self.languages:
                raise ValueError("Language '%s' not supported by '%s'",
                                 self.language, self.SLUG)
        cmd.extend(['-l', self.language])
        cmd.append(phrase)
        self._logger.debug('Executing %s', ' '.join([pipes.quote(arg)
                                                     for arg in cmd]))
        with tempfile.TemporaryFile() as f:
            subprocess.call(cmd, stdout=f, stderr=f)
            f.seek(0)
            output = f.read()
            if output:
                self._logger.debug("Output was: '%s'", output)
        self.play(fname)
        os.remove(fname)
ImageShow.py 文件源码 项目:imagepaste 作者: robinchenyu 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_command(self, file, **options):
            # on darwin open returns immediately resulting in the temp
            # file removal while app is opening
            command = "open -a /Applications/Preview.app"
            command = "(%s %s; sleep 20; rm -f %s)&" % (command, quote(file),
                                                        quote(file))
            return command
ImageShow.py 文件源码 项目:imagepaste 作者: robinchenyu 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def show_file(self, file, **options):
            command, executable = self.get_command_ex(file, **options)
            command = "(%s %s; rm -f %s)&" % (command, quote(file),
                                              quote(file))
            os.system(command)
            return 1

    # implementations
ImageShow.py 文件源码 项目:imagepaste 作者: robinchenyu 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_command_ex(self, file, title=None, **options):
            # note: xv is pretty outdated.  most modern systems have
            # imagemagick's display command instead.
            command = executable = "xv"
            if title:
                command += " -name %s" % quote(title)
            return command, executable
archive.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __uploadJenkins(self, step, buildIdFile, resultFile, suffix):
        """Generate upload shell script.

        We cannot simply copy the artifact to the final location as this is not
        atomic. Instead we create a temporary file at the repository root, copy
        the artifact there and hard-link the temporary file at the final
        location. If the link fails it is usually caused by a concurrent
        upload. Test that the artifact is readable in this case to distinguish
        it from other fatal errors.
        """
        if not self.canUploadJenkins():
            return ""

        return "\n" + textwrap.dedent("""\
            # upload artifact
            cd $WORKSPACE
            BOB_UPLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}"
            BOB_UPLOAD_FILE="{DIR}/${{BOB_UPLOAD_BID:0:2}}/${{BOB_UPLOAD_BID:2:2}}/${{BOB_UPLOAD_BID:4}}{SUFFIX}"
            if [[ ! -e ${{BOB_UPLOAD_FILE}} ]] ; then
                (
                    set -eE
                    T="$(mktemp -p {DIR})"
                    trap 'rm -f $T' EXIT
                    cp {RESULT} "$T"
                    mkdir -p "${{BOB_UPLOAD_FILE%/*}}"
                    if ! ln -T "$T" "$BOB_UPLOAD_FILE" ; then
                        [[ -r "$BOB_UPLOAD_FILE" ]] || exit 2
                    fi
                ){FIXUP}
            fi""".format(DIR=self.__basePath, BUILDID=quote(buildIdFile), RESULT=quote(resultFile),
                         FIXUP=" || echo Upload failed: $?" if self._ignoreErrors() else "",
                         GEN=ARCHIVE_GENERATION, SUFFIX=suffix))
archive.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def download(self, step, buildIdFile, tgzFile):
        if not self.canDownloadJenkins():
            return ""

        return "\n" + textwrap.dedent("""\
            if [[ ! -e {RESULT} ]] ; then
                BOB_DOWNLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}"
                BOB_DOWNLOAD_FILE="{DIR}/${{BOB_DOWNLOAD_BID:0:2}}/${{BOB_DOWNLOAD_BID:2:2}}/${{BOB_DOWNLOAD_BID:4}}{SUFFIX}"
                cp "$BOB_DOWNLOAD_FILE" {RESULT} || echo Download failed: $?
            fi
            """.format(DIR=self.__basePath, BUILDID=quote(buildIdFile), RESULT=quote(tgzFile),
                       GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX))
archive.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def download(self, step, buildIdFile, tgzFile):
        # only download if requested
        if not self.canDownloadJenkins():
            return ""

        return "\n" + textwrap.dedent("""\
            if [[ ! -e {RESULT} ]] ; then
                BOB_DOWNLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}"
                BOB_DOWNLOAD_URL="{URL}/${{BOB_DOWNLOAD_BID:0:2}}/${{BOB_DOWNLOAD_BID:2:2}}/${{BOB_DOWNLOAD_BID:4}}{SUFFIX}"
                curl -sSg --fail -o {RESULT} "$BOB_DOWNLOAD_URL" || echo Download failed: $?
            fi
            """.format(URL=self.__url.geturl(), BUILDID=quote(buildIdFile), RESULT=quote(tgzFile),
                       GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX))
archive.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def download(self, step, buildIdFile, tgzFile):
        # only download if requested
        if not self.canDownloadJenkins():
            return ""

        return """
if [[ ! -e {RESULT} ]] ; then
    BOB_DOWNLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}"
    BOB_LOCAL_ARTIFACT={RESULT}
    BOB_REMOTE_ARTIFACT="${{BOB_DOWNLOAD_BID:0:2}}/${{BOB_DOWNLOAD_BID:2:2}}/${{BOB_DOWNLOAD_BID:4}}{SUFFIX}"
    {CMD}
fi
""".format(CMD=self.__downloadCmd, BUILDID=quote(buildIdFile), RESULT=quote(tgzFile),
           GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX)
input.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __getitem__(self, item):
            mode = item[0]
            item = item[1:]
            content = []
            try:
                paths = sorted(glob(os.path.join(self.baseDir, item)))
                if not paths:
                    raise ParseError("No files matched in include pattern '{}'!"
                        .format(item))
                for path in paths:
                    content.append(self.fileLoader(path))
            except OSError as e:
                raise ParseError("Error including '"+item+"': " + str(e))
            content = b''.join(content)

            self.incDigests.append(asHexStr(hashlib.sha1(content).digest()))
            if mode == '<':
                var = "_{}{}".format(self.varBase, self.count)
                self.count += 1
                self.prolog.extend([
                    "{VAR}=$(mktemp)".format(VAR=var),
                    "_BOB_TMP_CLEANUP+=( ${VAR} )".format(VAR=var),
                    "base64 -d > ${VAR} <<EOF".format(VAR=var)])
                self.prolog.extend(sliceString(b64encode(content).decode("ascii"), 76))
                self.prolog.append("EOF")
                ret = "${" + var + "}"
            else:
                assert mode == "'"
                ret = quote(content.decode('utf8'))

            return ret


问题


面经


文章

微信
公众号

扫码关注公众号