python类PIPE的实例源码

command.py 文件源码 项目:heerkat 作者: Roche 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def cmd(command):
    result = Result()

    p = Popen(shlex.split(command), stdin=PIPE, stdout=PIPE, stderr=PIPE)
    (stdout, stderr) = p.communicate()

    result.exit_code = p.returncode
    result.stdout = stdout
    result.stderr = stderr
    result.command = command

    if p.returncode != 0:
        print 'Error executing command [%s]' % command
        print 'stderr: [%s]' % stderr
        print 'stdout: [%s]' % stdout

    return result
common.py 文件源码 项目:rca-evaluation 作者: sieve-microservices 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run_command(args, wait=False):

    try:
        if (wait):
            p = subprocess.Popen(
                args, 
                stdout = subprocess.PIPE)
            p.wait()
        else:
            p = subprocess.Popen(
                args, 
                stdin = None, stdout = None, stderr = None, close_fds = True)

        (result, error) = p.communicate()

    except subprocess.CalledProcessError as e:
        sys.stderr.write(
            "common::run_command() : [ERROR]: output = %s, error code = %s\n" 
            % (e.output, e.returncode))

    return result
compiler.py 文件源码 项目:pygrunt 作者: elementbound 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def preprocess_source(self, in_file, additional_args=[]):
        import subprocess

        self._args.extend(self._build_compiler_flags())
        self._args.extend(additional_args)

        result = subprocess.run(self._args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)

        if result.returncode == 0:
            return result.stdout
        else:
            if result.stderr:
                Style.error('Preprocess failed: ')
                print(result.stderr)

            return ''
os.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def popen(cmd, mode="r", buffering=-1):
    if not isinstance(cmd, str):
        raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
    if mode not in ("r", "w"):
        raise ValueError("invalid mode %r" % mode)
    if buffering == 0 or buffering is None:
        raise ValueError("popen() does not support unbuffered streams")
    import subprocess, io
    if mode == "r":
        proc = subprocess.Popen(cmd,
                                shell=True,
                                stdout=subprocess.PIPE,
                                bufsize=buffering)
        return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
    else:
        proc = subprocess.Popen(cmd,
                                shell=True,
                                stdin=subprocess.PIPE,
                                bufsize=buffering)
        return _wrap_close(io.TextIOWrapper(proc.stdin), proc)

# Helper for popen() -- a proxy for a file whose close waits for the process
facenet.py 文件源码 项目:facerecognition 作者: guoxiaolu 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def store_revision_info(src_path, output_dir, arg_string):

    # Get git hash
    gitproc = Popen(['git', 'rev-parse', 'HEAD'], stdout = PIPE, cwd=src_path)
    (stdout, _) = gitproc.communicate()
    git_hash = stdout.strip()

    # Get local changes
    gitproc = Popen(['git', 'diff', 'HEAD'], stdout = PIPE, cwd=src_path)
    (stdout, _) = gitproc.communicate()
    git_diff = stdout.strip()

    # Store a text file in the log directory
    rev_info_filename = os.path.join(output_dir, 'revision_info.txt')
    with open(rev_info_filename, "w") as text_file:
        text_file.write('arguments: %s\n--------------------\n' % arg_string)
        text_file.write('git hash: %s\n--------------------\n' % git_hash)
        text_file.write('%s' % git_diff)
common.py 文件源码 项目:rca-evaluation 作者: sieve-microservices 项目源码 文件源码 阅读 53 收藏 0 点赞 0 评论 0
def run_command(command, wait=False):

    try:
        if (wait):

            p = subprocess.Popen(
                [command], 
                stdout = subprocess.PIPE,
                shell = True)
            p.wait()
        else:
            p = subprocess.Popen(
                [command], 
                shell = True, 
                stdin = None, stdout = None, stderr = None, close_fds = True)

        (result, error) = p.communicate()

    except subprocess.CalledProcessError as e:
        sys.stderr.write(
            "common::run_command() : [ERROR]: output = %s, error code = %s\n" 
            % (e.output, e.returncode))

    return result
qqmathbook.py 文件源码 项目:qqmbr 作者: ischurov 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def mathjax(s):
    with open("temp.log", "w") as f:
        f.write(s)

    p = Popen([app.config['mjpage'],
              '--dollars',
               '--output', "CommonHTML",
               '--fontURL',
               ("https://cdnjs.cloudflare.com/ajax/libs/"
                "mathjax/2.7.0/fonts/HTML-CSS")], stdout=PIPE, stdin=PIPE,
              stderr=PIPE)

    #filename = hashlib.sha256(s.encode('utf-8')).hexdigest()
    #with open(filename, 'w') as f:
    #    print(s, file=f)

    res = p.communicate(input=s.encode('utf-8'))
    out = res[0].decode('utf-8')
    err = res[1].decode('utf-8')

    soup = BeautifulSoup(out, 'html.parser')
    style = str(soup.style)
    body = "".join(str(s) for s in soup.body.children)

    return style, body
pbimport.py 文件源码 项目:protofuzz 作者: trailofbits 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _compile_proto(full_path, dest):
    'Helper to compile protobuf files'
    proto_path = os.path.dirname(full_path)
    protoc_args = [find_protoc(),
                   '--python_out={}'.format(dest),
                   '--proto_path={}'.format(proto_path),
                   full_path]
    proc = subprocess.Popen(protoc_args, stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
    try:
        outs, errs = proc.communicate(timeout=5)
    except subprocess.TimeoutExpired:
        proc.kill()
        outs, errs = proc.communicate()
        return False

    if proc.returncode != 0:
        msg = 'Failed compiling "{}": \n\nstderr: {}\nstdout: {}'.format(
            full_path, errs.decode('utf-8'), outs.decode('utf-8'))
        raise BadProtobuf(msg)

    return True
vrmilter.py 文件源码 项目:sipxecs-voicemail-transcription 作者: andrewsauder 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def exec_cmd(cmd, **kwds):
    """
    Execute arbitrary commands as sub-processes.
    """
    stdin = kwds.get('stdin', None)
    stdin_flag = None
    if not stdin is None:
        stdin_flag = subprocess.PIPE
    proc = subprocess.Popen(
        cmd,
        stdin=stdin_flag,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE)
    stdout, stderr = proc.communicate(stdin)
    return (proc.returncode, stdout, stderr)


#=======================================================================
# Extend the Milter Class (where the email is captured)
#=======================================================================
nettools.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def del_addr(linkname, address):
        try:
            subprocess.run(['ip', 'address', 'del', address, 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
            return [True, str(linkname)]
        except subprocess.CalledProcessError as suberror:
            return [False, "delete address failed : %s" % suberror.stdout.decode('utf-8')]


# ovs-vsctl list-br
# ovs-vsctl br-exists <Bridge>
# ovs-vsctl add-br <Bridge>
# ovs-vsctl del-br <Bridge>
# ovs-vsctl list-ports <Bridge>
# ovs-vsctl del-port <Bridge> <Port>
# ovs-vsctl add-port <Bridge> <Port> -- set interface <Port> type=gre options:remote_ip=<RemoteIP>
# ovs-vsctl add-port <Bridge> <Port> tag=<ID> -- set interface <Port> type=internal
# ovs-vsctl port-to-br <Port>
# ovs-vsctl set Port <Port> tag=<ID>
# ovs-vsctl clear Port <Port> tag
utils.py 文件源码 项目:epubcheck 作者: titusz 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def epubcheck_help():
    """Return epubcheck.jar commandline help text.

    :return unicode: helptext from epubcheck.jar
    """

    # tc = locale.getdefaultlocale()[1]

    with open(os.devnull, "w") as devnull:
        p = subprocess.Popen(
            [c.JAVA, '-Duser.language=en', '-jar', c.EPUBCHECK, '-h'],
            stdout=subprocess.PIPE,
            stderr=devnull,
        )
        result = p.communicate()[0]

    return result.decode()
install_venv_common.py 文件源码 项目:networking-huawei 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def run_command_with_code(self, cmd, redirect_output=True,
                              check_exit_code=True):
        """Runs a command in an out-of-process shell.

        Returns the output of that command. Working directory is self.root.
        """
        if redirect_output:
            stdout = subprocess.PIPE
        else:
            stdout = None

        proc = subprocess.Popen(cmd, cwd=self.root, stdout=stdout)
        output = proc.communicate()[0]
        if check_exit_code and proc.returncode != 0:
            self.die('Command "%s" failed.\n%s', ' '.join(cmd), output)
        return (output, proc.returncode)
windows.py 文件源码 项目:NeoVintageous 作者: NeoVintageous 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def filter_region(view, txt, command):
    try:
        contents = tempfile.NamedTemporaryFile(suffix='.txt', delete=False)
        contents.write(txt.encode('utf-8'))
        contents.close()

        script = tempfile.NamedTemporaryFile(suffix='.bat', delete=False)
        script.write(('@echo off\ntype %s | %s' % (contents.name, command)).encode('utf-8'))
        script.close()

        p = subprocess.Popen([script.name],
                             stdout=PIPE,
                             stderr=PIPE,
                             startupinfo=get_startup_info())

        out, err = p.communicate()
        return (out or err).decode(get_oem_cp()).replace('\r\n', '\n')[:-1].strip()
    finally:
        os.remove(script.name)
        os.remove(contents.name)
test_13_TestSB.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_loadSADFile_startorder(self):
        maxpid=32768
        try:
            out=subprocess.Popen(['cat', '/proc/sys/kernel/pid_max'], stdout=subprocess.PIPE)
            res=out.communicate()
            maxpid=int(res[0].strip())
        except:
            pass
        retval = sb.loadSADFile('sdr/dom/waveforms/ticket_462_w/ticket_462_w.sad.xml')
        self.assertEquals(retval, True)
        comp_ac = sb.getComponent('ticket_462_ac_1')
        self.assertNotEquals(comp_ac, None)
        comp = sb.getComponent('ticket_462_1')
        self.assertNotEquals(comp, None)
        if comp_ac._pid <= maxpid-1:
            isless= comp_ac._pid < comp._pid
        else:
            isless=comp._pid < comp_ac._pid 
        self.assertTrue(isless)
test_01_nodeBooter.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_UserOrGroupNoDaemon(self):
        """Test that we read the correct domainname from the DMD file, the test domain
        should have been created by the test runner"""
        domainName = scatest.getTestDomainName()
        # Test that we don't already have a bound domain
        try:
            domMgr = self._root.resolve(scatest.getDomainMgrURI())
            self.assertEqual(domMgr, None)
        except CosNaming.NamingContext.NotFound:
            pass # This exception is expected

        args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user','domuser','--group','somegroup' ]
        nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
        self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)

        args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--group','somegroup' ]
        nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
        self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)

        args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user','domuser' ]
        nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
        self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)
test_01_nodeBooter.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_BadUserOrBadGroup(self):
        """Test that we read the correct domainname from the DMD file, the test domain
        should have been created by the test runner"""
        domainName = scatest.getTestDomainName()
        # Test that we don't already have a bound domain
        try:
            domMgr = self._root.resolve(scatest.getDomainMgrURI())
            self.assertEqual(domMgr, None)
        except CosNaming.NamingContext.NotFound:
            pass # This exception is expected

        args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user=domuser']
        nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
        self.assertNotEqual(nb.stderr.read().find('Separator must be a space'),-1)

        args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--group=somegroup']
        nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
        self.assertNotEqual(nb.stderr.read().find('Separator must be a space'),-1)
render.py 文件源码 项目:eClaire 作者: kogan 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def print_card(pdf, printer_name):
    """
    Send the PDF to the printer!

    Shells out to `lpr` to do the work.

    :param pdf: Binary PDF buffer
    :param printer_name: Name of the printer on the system (ie. CUPS name)
    """
    process = subprocess.Popen(
        ['lpr', '-P', printer_name],
        stdin=subprocess.PIPE
    )

    process.communicate(pdf)

    if process.returncode != 0:
        raise PrintingError('Return code {}'.format(process.returncode))
graphclust.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def pipe_weighted_edgelist_to_convert(matrix, bin_filename, weight_filename):
    """ Pipe a weighted edgelist (COO sparse matrix) to Louvain's convert utility """
    raise ValueError('Unsupported method at the moment')

    devnull = open(os.devnull, 'w')

    proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
                           '-i', '/dev/stdin',
                           '-o', bin_filename,
                           '-w', weight_filename,
                         ], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)

    # Stream text triplets to 'convert'
    for ijx in itertools.izip(matrix.row, matrix.col, matrix.data):
        proc.stdin.write('%d\t%d\t%f\n' % ijx)

    proc.stdin.close()
    proc.wait()
    devnull.close()
graphclust.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def pipe_unweighted_edgelist_to_convert(matrix, bin_filename):
    """ Pipe an unweighted edgelist (COO sparse matrix) to Louvain's convert utility """
    devnull = open(os.devnull, 'w')

    proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
                           '-i', '/dev/stdin',
                           '-o', bin_filename,
                         ], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)

    # Stream text triplets to 'convert'
    for ij in itertools.izip(matrix.row, matrix.col):
        proc.stdin.write('%d\t%d\n' % ij)

    proc.stdin.close()
    proc.wait()
    devnull.close()
__init__.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def read_fastq(self):
        if self.file[-2:] == "gz":
            proc = subprocess.Popen(["gunzip", "--stdout", self.file], stdout=subprocess.PIPE)
            reader = proc.stdout
        else:
            reader = file(self.file, "r")

        while True:
            header = reader.next().strip()
            seq = reader.next().strip()
            reader.next() # incr line
            qual = reader.next().strip()

            if self.rc:
                seq = tk_seq.get_rev_comp(seq)
                qual = qual[::-1]

            yield FastqRow(header, seq, qual)

        reader.close()
sync.py 文件源码 项目:django-powerpages 作者: Open-E-WEB 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def add_to_vcs(self, summary):
        if (
            self.git_add and
            (SyncStatus.DELETED in summary or SyncStatus.ADDED in summary) and
            not self.dry_run and
            self.confirm(
                question=(
                    'Do you want to add created and removed files to GIT?'
                )
            )
        ):
            output, errors = subprocess.Popen(
                ['git', '-C', app_settings.SYNC_DIRECTORY,
                 'add', '-A', app_settings.SYNC_DIRECTORY],
                stdout=subprocess.PIPE, stderr=subprocess.PIPE
            ).communicate()
            if errors:
                raise self.error('Adding file changes to GIT failed!')
conv2mp4-server.py 文件源码 项目:conv2mp4-py 作者: Kameecoding 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def get_audio_streams(self):
        with open(os.devnull, 'w') as DEV_NULL:
            #Get file info and Parse it
            try:
                proc = subprocess.Popen([
                    FFPROBE,
                    '-i', self.input_video,
                    '-of', 'json',
                    '-show_streams'
                ], stdout=subprocess.PIPE, stderr=DEV_NULL)
            except OSError as e:
                if e.errno == os.errno.ENOENT:
                    Logger.error("FFPROBE not found, install on your system to use this script")
                    sys.exit(0)
            output = proc.stdout.read()

            return get_audio_streams(json.loads(output))
build_docs.py 文件源码 项目:wiki-to-doc 作者: serra 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def build_mkdocs(self):
        """
        Invokes MkDocs to build the static documentation and moves the folder
        into the project root folder.
        """
        # Setting the working directory
        os.chdir(MKDOCS_DIR)

        # Building the MkDocs project
        pipe = subprocess.PIPE
        mkdocs_process = subprocess.Popen(
            ["mkdocs", "build", "-q"], stdout=pipe, stderr=pipe)
        std_op, std_err_op = mkdocs_process.communicate()

        if std_err_op:
            raise Error("Could not build MkDocs !\n%s" %
                        std_err_op)
goglib_get_games_list.py 文件源码 项目:games_nebula 作者: yancharkin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def goglib_get_games_list():

    proc = subprocess.Popen(['lgogdownloader', '--exclude', \
    '1,2,4,8,16,32', '--list-details'],stdout=subprocess.PIPE)
    games_detailed_list = proc.stdout.readlines()
    stdoutdata, stderrdata = proc.communicate()

    if proc.returncode == 0:

        file_path = os.getenv('HOME') + '/.games_nebula/config/games_list'

        games_list_file = open(file_path, 'w')

        for line in games_detailed_list:
            if 'Getting game info' not in line:
                games_list_file.write(line)

        return 0

    else:

        return 1
launcher_wine.py 文件源码 项目:games_nebula 作者: yancharkin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def win64_available(self):

        wine_bin, \
        wineserver_bin, \
        wine_lib = self.get_wine_bin_path()

        dev_null = open(os.devnull, 'w')
        try:
            proc = subprocess.Popen([wine_bin + '64'], stdout=dev_null, \
                stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
            dev_null.close()
            stdoutdata, stderrdata = proc.communicate()
            if proc.returncode == 1:
                self.combobox_winearch.set_visible(True)
                return True
            else:
                self.combobox_winearch.set_visible(False)
                self.winearch = 'win32'
                return False
        except:
            self.combobox_winearch.set_visible(False)
            self.winearch = 'win32'
            return False
fabric.py 文件源码 项目:err-fabric 作者: netquity 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def execute_task(host, tasks):
        """Call Fabric to execute tasks against a host

        Return: CompletedProcess instance
        """
        # TODO: add support for groups, multiple hosts
        # TODO: add support for providing input data from team files
        return subprocess.run(
            [
                PYTHON2_PATH,
                FABRIC_PATH,
                '--abort-on-prompts',
                '--hosts=%s' % host,
                '--fabfile=%s' % FABFILE_PATH,
                *tasks,
            ],
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,  # Combine out/err into stdout; stderr will be None
            universal_newlines=True,
            check=True,
        )
forgot_password.py 文件源码 项目:PimuxBot 作者: Finn10111 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def index():
    if request.args.get('code'):
        unlock_code = request.args.get('code')
        # unlock, new password
        re = s.query(RecoveryEmail).filter(RecoveryEmail.password_code==unlock_code).one_or_none()
        if re:
            jid = re.jid
            re.password_code = None
            s.merge(re)
            s.commit()
            # set new password and send email
            email_address = re.email
            password = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(10))
            p = subprocess.Popen(['/usr/bin/prosodyctl', 'passwd', jid], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
            args = bytes("%s\n%s\n" % (password, password), encoding='utf8')
            p.communicate(args)
            sendMail(email_address, 'new password', password)
            content = render_template('success.html', message='password was sent')
        else:
            content = render_template('error.html', message='link invalid')
    else:
        content = render_template('index.html')
    return content
cli.py 文件源码 项目:pocket-cli 作者: rakanalh 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def output_articles(articles):
    if len(articles) == 0:
        print('No articles found')
        return

    try:
        pager = subprocess.Popen(['less'],
                                 stdin=subprocess.PIPE,
                                 stdout=sys.stdout)
        for article in articles:
            if int(article['reading_time']) <= 0:
                article['reading_time'] = 'Unknown'
            content = format_article(article, line=True)

            if six.PY3:
                content = bytearray(content, 'utf-8')

            pager.stdin.write(content)

        pager.stdin.close()
        pager.wait()
    except (KeyboardInterrupt, ValueError):
        pass
pjf_executor.py 文件源码 项目:PyJFuzz 作者: mseclab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def spawn(self, cmd, stdin_content="", stdin=False, shell=False, timeout=2):
        """
        Spawn a new process using subprocess
        """
        try:
            if type(cmd) != list:
                raise PJFInvalidType(type(cmd), list)
            if type(stdin_content) != str:
                raise PJFInvalidType(type(stdin_content), str)
            if type(stdin) != bool:
                raise PJFInvalidType(type(stdin), bool)
            self._in = stdin_content
            try:
                self.process = subprocess.Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE, shell=shell)
                self.finish_read(timeout, stdin_content, stdin)
                if self.process.poll() is not None:
                    self.close()
            except KeyboardInterrupt:
                return
        except OSError:
            raise PJFProcessExecutionError("Binary <%s> does not exist" % cmd[0])
        except Exception as e:
            raise PJFBaseException(e.message if hasattr(e, "message") else str(e))
s3deploy.py 文件源码 项目:foremast 作者: gogoair 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _upload_artifacts_to_version(self):
        """Recursively upload directory contents to S3."""
        if not os.listdir(self.artifact_path) or not self.artifact_path:
            raise S3ArtifactNotFound

        uploaded = False
        if self.s3props.get("content_metadata"):
            LOG.info("Uploading in multiple parts to set metadata")
            uploaded = self.content_metadata_uploads()

        if not uploaded:
            cmd = 'aws s3 sync {} {} --delete --exact-timestamps --profile {}'.format(self.artifact_path,
                                                                                      self.s3_version_uri, self.env)
            result = subprocess.run(cmd, check=True, shell=True, stdout=subprocess.PIPE)
            LOG.debug("Upload Command Ouput: %s", result.stdout)

        LOG.info("Uploaded artifacts to %s bucket", self.bucket)


问题


面经


文章

微信
公众号

扫码关注公众号