python类quote()的实例源码

tts.py 文件源码 项目:dingdang-robot 作者: wzpan 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_speech(self, phrase):
        getinfo_url = 'http://www.peiyinge.com/make/getSynthSign'
        voice_baseurl = 'http://proxy.peiyinge.com:17063/synth?ts='
        data = {
            'content': phrase.encode('utf8')
        }
        result_info = requests.post(getinfo_url, data=data).json()
        content = urllib.quote(phrase.encode('utf8'))
        ts = result_info['ts']
        sign = result_info['sign']
        voice_url = voice_baseurl + ts + '&sign=' + sign + \
            '&vid=' + self.vid + '&volume=&speed=0&content=' + content
        r = requests.get(voice_url)
        with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as f:
            f.write(r.content)
            tmpfile = f.name
            return tmpfile
apk_operations.py 文件源码 项目:nojs 作者: chrisdickinson 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _RunGdb(device, package_name, output_directory, target_cpu, extra_args,
            verbose):
  gdb_script_path = os.path.dirname(__file__) + '/adb_gdb'
  cmd = [
      gdb_script_path,
      '--package-name=%s' % package_name,
      '--output-directory=%s' % output_directory,
      '--adb=%s' % adb_wrapper.AdbWrapper.GetAdbPath(),
      '--device=%s' % device.serial,
      # Use one lib dir per device so that changing between devices does require
      # refetching the device libs.
      '--pull-libs-dir=/tmp/adb-gdb-libs-%s' % device.serial,
  ]
  # Enable verbose output of adb_gdb if it's set for this script.
  if verbose:
    cmd.append('--verbose')
  if target_cpu:
    cmd.append('--target-arch=%s' % _TargetCpuToTargetArch(target_cpu))
  cmd.extend(extra_args)
  logging.warning('Running: %s', ' '.join(pipes.quote(x) for x in cmd))
  print _Colorize('YELLOW', 'All subsequent output is from adb_gdb script.')
  os.execv(gdb_script_path, cmd)
archive.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def upload(self, step, buildIdFile, tgzFile):
        # only upload if requested
        if not self.canUploadJenkins():
            return ""

        # upload with curl if file does not exist yet on server
        return "\n" + textwrap.dedent("""\
            # upload artifact
            cd $WORKSPACE
            BOB_UPLOAD_BID="$(hexdump -ve '/1 "%02x"' {BUILDID}){GEN}"
            BOB_UPLOAD_URL="{URL}/${{BOB_UPLOAD_BID:0:2}}/${{BOB_UPLOAD_BID:2:2}}/${{BOB_UPLOAD_BID:4}}{SUFFIX}"
            if ! curl --output /dev/null --silent --head --fail "$BOB_UPLOAD_URL" ; then
                BOB_UPLOAD_RSP=$(curl -sSgf -w '%{{http_code}}' -H 'If-None-Match: *' -T {RESULT} "$BOB_UPLOAD_URL" || true)
                if [[ $BOB_UPLOAD_RSP != 2?? && $BOB_UPLOAD_RSP != 412 ]]; then
                    echo "Upload failed with code $BOB_UPLOAD_RSP"{FAIL}
                fi
            fi""".format(URL=self.__url.geturl(), BUILDID=quote(buildIdFile), RESULT=quote(tgzFile),
                         FAIL="" if self._ignoreErrors() else "; exit 1",
                         GEN=ARCHIVE_GENERATION, SUFFIX=ARTIFACT_SUFFIX))
archive.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def uploadJenkinsLiveBuildId(self, step, liveBuildId, buildId):
        # only upload if requested
        if not self.canUploadJenkins():
            return ""

        # upload with curl if file does not exist yet on server
        return "\n" + textwrap.dedent("""\
            # upload live build-id
            cd $WORKSPACE
            BOB_UPLOAD_BID="$(hexdump -ve '/1 "%02x"' {LIVEBUILDID}){GEN}"
            BOB_UPLOAD_URL="{URL}/${{BOB_UPLOAD_BID:0:2}}/${{BOB_UPLOAD_BID:2:2}}/${{BOB_UPLOAD_BID:4}}{SUFFIX}"
            BOB_UPLOAD_RSP=$(curl -sSgf -w '%{{http_code}}' -H 'If-None-Match: *' -T {BUILDID} "$BOB_UPLOAD_URL" || true)
            if [[ $BOB_UPLOAD_RSP != 2?? && $BOB_UPLOAD_RSP != 412 ]]; then
                echo "Upload failed with code $BOB_UPLOAD_RSP"{FAIL}
            fi
            """.format(URL=self.__url.geturl(), LIVEBUILDID=quote(liveBuildId),
                       BUILDID=quote(buildId),
                       FAIL="" if self._ignoreErrors() else "; exit 1",
                       GEN=ARCHIVE_GENERATION, SUFFIX=BUILDID_SUFFIX))
jenkins.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def jenkinsNamePersister(jenkins, wrapFmt, uuid):

    def persist(step, props):
        ret = BobState().getJenkinsByNameDirectory(
            jenkins, wrapFmt(step, props), step.getVariantId())
        if uuid: ret = ret + "-" + uuid
        return ret

    def fmt(step, mode, props):
        if mode == 'workspace':
            return persist(step, props)
        else:
            assert mode == 'exec'
            if step.getSandbox() is None:
                return os.path.join("$PWD", quote(persist(step, props)))
            else:
                return os.path.join("/bob", asHexStr(step.getVariantId()), "workspace")

    return fmt
recipe-577162.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def do_osx_install(srcdir, targetdir):

    if os.path.exists(targetdir):
        print 'Target dir %s already exists! Removing...'
        shutil.rmtree(targetdir)

    install_script = os.popen('find '+ srcdir +' -iname install.sh').read().strip()
    print 'DBG install_script:', install_script
    os.popen('chmod +x "%s"' % install_script)
    cmd_install = '%s %s %s' % (pipes.quote(install_script), srcdir, targetdir)
    print 'DBG cmd: "%s"' % cmd_install
    cmd_chmod_chromium = 'find %s -name Chromium -exec chmod +x {} \;' % (targetdir)
    cmd_chmod_chromium_helper = 'find %s -name Chromium\ Helper -exec chmod +x {} \;' % (targetdir)
    for cmd in [cmd_install, cmd_chmod_chromium, cmd_chmod_chromium_helper]:

        proc = subprocess.Popen(cmd, shell=True)
        proc.wait()
        if proc.returncode:
            print "returncode " + str(proc.returncode)
cfg_cross_gnu.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def xcheck_envar(conf, name, wafname=None, cross=False):
    wafname = wafname or name
    envar = os.environ.get(name, None)

    if envar is None:
        return

    value = Utils.to_list(envar) if envar != '' else [envar]

    conf.env[wafname] = value
    if cross:
        pretty = 'cross-compilation %s' % wafname
    else:
        pretty = wafname
    conf.msg('Will use %s' % pretty,
     " ".join(quote(x) for x in value))
cfg_cross_gnu.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def xcheck_envar(conf, name, wafname=None, cross=False):
    wafname = wafname or name
    envar = os.environ.get(name, None)

    if envar is None:
        return

    value = Utils.to_list(envar) if envar != '' else [envar]

    conf.env[wafname] = value
    if cross:
        pretty = 'cross-compilation %s' % wafname
    else:
        pretty = wafname
    conf.msg('Will use %s' % pretty,
     " ".join(quote(x) for x in value))
build_of.py 文件源码 项目:temporal-segment-networks 作者: yjxiong 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run_optical_flow(vid_item, dev_id=0):
    vid_path = vid_item[0]
    vid_id = vid_item[1]
    vid_name = vid_path.split('/')[-1].split('.')[0]
    out_full_path = os.path.join(out_path, vid_name)
    try:
        os.mkdir(out_full_path)
    except OSError:
        pass

    current = current_process()
    dev_id = (int(current._identity[0]) - 1) % NUM_GPU
    image_path = '{}/img'.format(out_full_path)
    flow_x_path = '{}/flow_x'.format(out_full_path)
    flow_y_path = '{}/flow_y'.format(out_full_path)

    cmd = os.path.join(df_path + 'build/extract_gpu')+' -f {} -x {} -y {} -i {} -b 20 -t 1 -d {} -s 1 -o {} -w {} -h {}'.format(
        quote(vid_path), quote(flow_x_path), quote(flow_y_path), quote(image_path), dev_id, out_format, new_size[0], new_size[1])

    os.system(cmd)
    print '{} {} done'.format(vid_id, vid_name)
    sys.stdout.flush()
    return True
pipstrap.py 文件源码 项目:antenna 作者: mozilla-services 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def main():
    temp = mkdtemp(prefix='pipstrap-')
    try:
        downloads = [hashed_download(url, temp, digest)
                     for url, digest in PACKAGES]
        check_output('pip install --no-index --no-deps -U ' +
                     ' '.join(quote(d) for d in downloads),
                     shell=True)
    except HashError as exc:
        print(exc)
    except Exception:
        rmtree(temp)
        raise
    else:
        rmtree(temp)
        return 0
    return 1
common.py 文件源码 项目:Qyoutube-dl 作者: lzambella 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _debug_cmd(self, args, exe=None):
        if not self.params.get('verbose', False):
            return

        str_args = [decodeArgument(a) for a in args]

        if exe is None:
            exe = os.path.basename(str_args[0])

        try:
            import pipes
            shell_quote = lambda args: ' '.join(map(pipes.quote, str_args))
        except ImportError:
            shell_quote = repr
        self.to_screen('[debug] %s command line: %s' % (
            exe, shell_quote(str_args)))
build_of.py 文件源码 项目:Video-Classification-Action-Recognition 作者: qijiezhao 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def run_optical_flow(vid_item, dev_id=0):
    vid_path = vid_item[0]
    vid_id = vid_item[1]
    vid_name = vid_path.split('/')[-1].split('.')[0]
    out_full_path = os.path.join(out_path, vid_name)
    try:
        os.mkdir(out_full_path)
    except OSError:
        pass

    current = current_process()
    dev_id = (int(current._identity[0]) - 1) % NUM_GPU
    image_path = '{}/img'.format(out_full_path)
    flow_x_path = '{}/flow_x'.format(out_full_path)
    flow_y_path = '{}/flow_y'.format(out_full_path)

    cmd = os.path.join(df_path + 'build/extract_gpu')+' -f {} -x {} -y {} -i {} -b 20 -t 1 -d {} -s 1 -o {} -w {} -h {}'.format(
        quote(vid_path), quote(flow_x_path), quote(flow_y_path), quote(image_path), dev_id, out_format, new_size[0], new_size[1])

    os.system(cmd)
    print '{} {} done'.format(vid_id, vid_name)
    sys.stdout.flush()
    return True
container.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _set_delayed(self):
        """Delayed change of iptables rules (postprocess method).
        After deleting/creation of pod we should change iptables rules, but
        we don't know if the operation actually have been performed. So, wait
        for 2 minutes and call postprocess method as superuser (via suid
        binary 'suidwrap').

        """
        token = getattr(self, 'token', None)
        if not token or token == 'None':
            data = self.query.get(AUTH_TOKEN_PATH)
            token = data['token']
        try:
            fmt = 'echo /usr/libexec/suidwrap "{0}" {1} ' \
                  '|at now + 2 minute > /dev/null 2>&1'
            subprocess.check_call([fmt.format(token, quote(self.name))],
                                  shell=True)
        except (KeyError, TypeError, subprocess.CalledProcessError):
            return
common.py 文件源码 项目:optimalvibes 作者: littlemika 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _debug_cmd(self, args, exe=None):
        if not self.params.get('verbose', False):
            return

        str_args = [decodeArgument(a) for a in args]

        if exe is None:
            exe = os.path.basename(str_args[0])

        try:
            import pipes
            shell_quote = lambda args: ' '.join(map(pipes.quote, str_args))
        except ImportError:
            shell_quote = repr
        self.to_screen('[debug] %s command line: %s' % (
            exe, shell_quote(str_args)))
grep.py 文件源码 项目:sublimeplugins 作者: ktuan89 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def search(self, query):
        if self.grep_command is not None:
            command = self.grep_command.format(pipes.quote(query))
        elif self.show_in_view:
            command = grepFormatStr().format(
                grepPath(self.window),
                pipes.quote(query)
            )
        else:
            # we need quick results
            command = quickGrepFormatStr().format(
                grepPath(self.window),
                pipes.quote(query)
            )

        sublime.status_message("grepping {0} ...".format(pipes.quote(query)))

        output, _ = run_bash_for_output(command)
        lines = output.split('\n')
        self.show_results(query, lines)
unarchive.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def files_in_archive(self, force_refresh=False):
        if self._files_in_archive and not force_refresh:
            return self._files_in_archive

        cmd = [ self.cmd_path, '--list', '-C', self.dest ]
        if self.zipflag:
            cmd.append(self.zipflag)
        if self.opts:
            cmd.extend([ '--show-transformed-names' ] + self.opts)
        if self.excludes:
            cmd.extend([ '--exclude=' + quote(f) for f in self.excludes ])
        cmd.extend([ '-f', self.src ])
        rc, out, err = self.module.run_command(cmd, cwd=self.dest, environ_update=dict(LANG='C', LC_ALL='C', LC_MESSAGES='C'))
        if rc != 0:
            raise UnarchiveError('Unable to list files in the archive')

        for filename in out.splitlines():
            # Compensate for locale-related problems in gtar output (octal unicode representation) #11348
#            filename = filename.decode('string_escape')
            filename = codecs.escape_decode(filename)[0]
            if filename and filename not in self.excludes:
                self._files_in_archive.append(to_native(filename))
        return self._files_in_archive
unarchive.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def unarchive(self):
        cmd = [ self.cmd_path, '--extract', '-C', self.dest ]
        if self.zipflag:
            cmd.append(self.zipflag)
        if self.opts:
            cmd.extend([ '--show-transformed-names' ] + self.opts)
        if self.file_args['owner']:
            cmd.append('--owner=' + quote(self.file_args['owner']))
        if self.file_args['group']:
            cmd.append('--group=' + quote(self.file_args['group']))
        if self.module.params['keep_newer']:
            cmd.append('--keep-newer-files')
        if self.excludes:
            cmd.extend([ '--exclude=' + quote(f) for f in self.excludes ])
        cmd.extend([ '-f', self.src ])
        rc, out, err = self.module.run_command(cmd, cwd=self.dest, environ_update=dict(LANG='C', LC_ALL='C', LC_MESSAGES='C'))
        return dict(cmd=cmd, rc=rc, out=out, err=err)
cronvar.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _read_user_execute(self):
        """
        Returns the command line for reading a crontab
        """
        user = ''

        if self.user:
            if platform.system() == 'SunOS':
                return "su %s -c '%s -l'" % (pipes.quote(self.user), pipes.quote(CRONCMD))
            elif platform.system() == 'AIX':
                return "%s -l %s" % (pipes.quote(CRONCMD), pipes.quote(self.user))
            elif platform.system() == 'HP-UX':
                return "%s %s %s" % (CRONCMD , '-l', pipes.quote(self.user))
            else:
                user = '-u %s' % pipes.quote(self.user)
        return "%s %s %s" % (CRONCMD , user, '-l')
macports.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def query_package(module, port_path, name, state="present"):
    """ Returns whether a package is installed or not. """

    if state == "present":

        rc, out, err = module.run_command("%s installed | grep -q ^.*%s" % (pipes.quote(port_path), pipes.quote(name)), use_unsafe_shell=True)
        if rc == 0:
            return True

        return False

    elif state == "active":

        rc, out, err = module.run_command("%s installed %s | grep -q active" % (pipes.quote(port_path), pipes.quote(name)), use_unsafe_shell=True)

        if rc == 0:
            return True

        return False
__init__.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def build_module_command(self, env_string, shebang, cmd, arg_path=None, rm_tmp=None):
        # don't quote the cmd if it's an empty string, because this will break pipelining mode
        if cmd.strip() != '':
            cmd = pipes.quote(cmd)

        cmd_parts = []
        if shebang:
            shebang = shebang.replace("#!", "").strip()
        else:
            shebang = ""
        cmd_parts.extend([env_string.strip(), shebang, cmd])
        if arg_path is not None:
            cmd_parts.append(arg_path)
        new_cmd = " ".join(cmd_parts)
        if rm_tmp:
            new_cmd = '%s; rm -rf "%s" %s' % (new_cmd, rm_tmp, self._SHELL_REDIRECT_ALLNULL)
        return new_cmd
libvirt_lxc.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def put_file(self, in_path, out_path):
        ''' transfer a file from local to lxc '''
        super(Connection, self).put_file(in_path, out_path)
        display.vvv("PUT %s TO %s" % (in_path, out_path), host=self.lxc)

        out_path = pipes.quote(self._prefix_login_path(out_path))
        try:
            with open(to_bytes(in_path, errors='surrogate_or_strict'), 'rb') as in_file:
                try:
                    p = self._buffered_exec_command('dd of=%s bs=%s' % (out_path, BUFSIZE), stdin=in_file)
                except OSError:
                    raise AnsibleError("chroot connection requires dd command in the chroot")
                try:
                    stdout, stderr = p.communicate()
                except:
                    traceback.print_exc()
                    raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
                if p.returncode != 0:
                    raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
        except IOError:
            raise AnsibleError("file or module does not exist at: %s" % in_path)
libvirt_lxc.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def fetch_file(self, in_path, out_path):
        ''' fetch a file from lxc to local '''
        super(Connection, self).fetch_file(in_path, out_path)
        display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self.lxc)

        in_path = pipes.quote(self._prefix_login_path(in_path))
        try:
            p = self._buffered_exec_command('dd if=%s bs=%s' % (in_path, BUFSIZE))
        except OSError:
            raise AnsibleError("chroot connection requires dd command in the chroot")

        with open(to_bytes(out_path, errors='surrogate_or_strict'), 'wb+') as out_file:
            try:
                chunk = p.stdout.read(BUFSIZE)
                while chunk:
                    out_file.write(chunk)
                    chunk = p.stdout.read(BUFSIZE)
            except:
                traceback.print_exc()
                raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
            stdout, stderr = p.communicate()
            if p.returncode != 0:
                raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
chroot.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def put_file(self, in_path, out_path):
        ''' transfer a file from local to chroot '''
        super(Connection, self).put_file(in_path, out_path)
        display.vvv("PUT %s TO %s" % (in_path, out_path), host=self.chroot)

        out_path = pipes.quote(self._prefix_login_path(out_path))
        try:
            with open(to_bytes(in_path, errors='surrogate_or_strict'), 'rb') as in_file:
                try:
                    p = self._buffered_exec_command('dd of=%s bs=%s' % (out_path, BUFSIZE), stdin=in_file)
                except OSError:
                    raise AnsibleError("chroot connection requires dd command in the chroot")
                try:
                    stdout, stderr = p.communicate()
                except:
                    traceback.print_exc()
                    raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
                if p.returncode != 0:
                    raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
        except IOError:
            raise AnsibleError("file or module does not exist at: %s" % in_path)
jail.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def put_file(self, in_path, out_path):
        ''' transfer a file from local to jail '''
        super(Connection, self).put_file(in_path, out_path)
        display.vvv("PUT %s TO %s" % (in_path, out_path), host=self.jail)

        out_path = pipes.quote(self._prefix_login_path(out_path))
        try:
            with open(to_bytes(in_path, errors='surrogate_or_strict'), 'rb') as in_file:
                try:
                    p = self._buffered_exec_command('dd of=%s bs=%s' % (out_path, BUFSIZE), stdin=in_file)
                except OSError:
                    raise AnsibleError("jail connection requires dd command in the jail")
                try:
                    stdout, stderr = p.communicate()
                except:
                    traceback.print_exc()
                    raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
                if p.returncode != 0:
                    raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
        except IOError:
            raise AnsibleError("file or module does not exist at: %s" % in_path)
jail.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def fetch_file(self, in_path, out_path):
        ''' fetch a file from jail to local '''
        super(Connection, self).fetch_file(in_path, out_path)
        display.vvv("FETCH %s TO %s" % (in_path, out_path), host=self.jail)

        in_path = pipes.quote(self._prefix_login_path(in_path))
        try:
            p = self._buffered_exec_command('dd if=%s bs=%s' % (in_path, BUFSIZE))
        except OSError:
            raise AnsibleError("jail connection requires dd command in the jail")

        with open(to_bytes(out_path, errors='surrogate_or_strict'), 'wb+') as out_file:
            try:
                chunk = p.stdout.read(BUFSIZE)
                while chunk:
                    out_file.write(chunk)
                    chunk = p.stdout.read(BUFSIZE)
            except:
                traceback.print_exc()
                raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
            stdout, stderr = p.communicate()
            if p.returncode != 0:
                raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
zone.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def put_file(self, in_path, out_path):
        ''' transfer a file from local to zone '''
        super(Connection, self).put_file(in_path, out_path)
        display.vvv("PUT %s TO %s" % (in_path, out_path), host=self.zone)

        out_path = pipes.quote(self._prefix_login_path(out_path))
        try:
            with open(in_path, 'rb') as in_file:
                try:
                    p = self._buffered_exec_command('dd of=%s bs=%s' % (out_path, BUFSIZE), stdin=in_file)
                except OSError:
                    raise AnsibleError("jail connection requires dd command in the jail")
                try:
                    stdout, stderr = p.communicate()
                except:
                    traceback.print_exc()
                    raise AnsibleError("failed to transfer file %s to %s" % (in_path, out_path))
                if p.returncode != 0:
                    raise AnsibleError("failed to transfer file %s to %s:\n%s\n%s" % (in_path, out_path, stdout, stderr))
        except IOError:
            raise AnsibleError("file or module does not exist at: %s" % in_path)
utils.py 文件源码 项目:build-calibre 作者: kovidgoyal 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run(*args, **kw):
    if len(args) == 1 and isinstance(args[0], type('')):
        cmd = split(args[0])
    else:
        cmd = args
    print(' '.join(pipes.quote(x) for x in cmd))
    sys.stdout.flush()
    env = current_env(library_path=kw.get('library_path'))
    try:
        p = subprocess.Popen(cmd, env=env, cwd=kw.get('cwd'))
    except EnvironmentError as err:
        if err.errno == errno.ENOENT:
            raise SystemExit('Could not find the program: %s' % cmd[0])
        raise
    rc = p.wait()
    if kw.get('no_check'):
        return rc
    if rc != 0:
        print('The following command failed, with return code:', rc)
        print(' '.join(pipes.quote(x) for x in cmd))
        print('Dropping you into a shell')
        sys.stdout.flush()
        run_shell(library_path=kw.get('library_path'))
        raise SystemExit(1)
git.py 文件源码 项目:isar 作者: ilbers 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def gitpkgv_revision(self, ud, d, name):
        """
        Return a sortable revision number by counting commits in the history
        Based on gitpkgv.bblass in meta-openembedded
        """
        rev = self._build_revision(ud, d, name)
        localpath = ud.localpath
        rev_file = os.path.join(localpath, "oe-gitpkgv_" + rev)
        if not os.path.exists(localpath):
            commits = None
        else:
            if not os.path.exists(rev_file) or not os.path.getsize(rev_file):
                from pipes import quote
                commits = bb.fetch2.runfetchcmd(
                        "git rev-list %s -- | wc -l" % (quote(rev)),
                        d, quiet=True).strip().lstrip('0')
                if commits:
                    open(rev_file, "w").write("%d\n" % int(commits))
            else:
                commits = open(rev_file, "r").readline(128).strip()
        if commits:
            return False, "%s+%s" % (commits, rev[:7])
        else:
            return True, str(rev)
pipstrap.py 文件源码 项目:certbot 作者: nikoloskii 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main():
    temp = mkdtemp(prefix='pipstrap-')
    try:
        downloads = [hashed_download(url, temp, digest)
                     for url, digest in PACKAGES]
        check_output('pip install --no-index --no-deps -U ' +
                     ' '.join(quote(d) for d in downloads),
                     shell=True)
    except HashError as exc:
        print(exc)
    except Exception:
        rmtree(temp)
        raise
    else:
        rmtree(temp)
        return 0
    return 1
utils.py 文件源码 项目:unionfs_cleaner 作者: l3uddz 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def opened_files(path, excludes):
    files = []

    try:
        process = os.popen('lsof -wFn +D %s | tail -n +2 | cut -c2-' % cmd_quote(path))
        data = process.read()
        process.close()
        for item in data.split('\n'):
            if not item or len(item) <= 2 or os.path.isdir(item) or item.isdigit() or file_excluded(item, excludes):
                continue
            files.append(item)

        return files

    except Exception as ex:
        logger.exception("Exception checking %r: ", path)
        return None


问题


面经


文章

微信
公众号

扫码关注公众号