python类Popen()的实例源码

command.py 文件源码 项目:heerkat 作者: Roche 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def cmd(command):
    result = Result()

    p = Popen(shlex.split(command), stdin=PIPE, stdout=PIPE, stderr=PIPE)
    (stdout, stderr) = p.communicate()

    result.exit_code = p.returncode
    result.stdout = stdout
    result.stderr = stderr
    result.command = command

    if p.returncode != 0:
        print 'Error executing command [%s]' % command
        print 'stderr: [%s]' % stderr
        print 'stdout: [%s]' % stdout

    return result
common.py 文件源码 项目:rca-evaluation 作者: sieve-microservices 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run_command(args, wait=False):

    try:
        if (wait):
            p = subprocess.Popen(
                args, 
                stdout = subprocess.PIPE)
            p.wait()
        else:
            p = subprocess.Popen(
                args, 
                stdin = None, stdout = None, stderr = None, close_fds = True)

        (result, error) = p.communicate()

    except subprocess.CalledProcessError as e:
        sys.stderr.write(
            "common::run_command() : [ERROR]: output = %s, error code = %s\n" 
            % (e.output, e.returncode))

    return result
facenet.py 文件源码 项目:facerecognition 作者: guoxiaolu 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def store_revision_info(src_path, output_dir, arg_string):

    # Get git hash
    gitproc = Popen(['git', 'rev-parse', 'HEAD'], stdout = PIPE, cwd=src_path)
    (stdout, _) = gitproc.communicate()
    git_hash = stdout.strip()

    # Get local changes
    gitproc = Popen(['git', 'diff', 'HEAD'], stdout = PIPE, cwd=src_path)
    (stdout, _) = gitproc.communicate()
    git_diff = stdout.strip()

    # Store a text file in the log directory
    rev_info_filename = os.path.join(output_dir, 'revision_info.txt')
    with open(rev_info_filename, "w") as text_file:
        text_file.write('arguments: %s\n--------------------\n' % arg_string)
        text_file.write('git hash: %s\n--------------------\n' % git_hash)
        text_file.write('%s' % git_diff)
common.py 文件源码 项目:rca-evaluation 作者: sieve-microservices 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run_command(command, wait=False):

    try:
        if (wait):

            p = subprocess.Popen(
                [command], 
                stdout = subprocess.PIPE,
                shell = True)
            p.wait()
        else:
            p = subprocess.Popen(
                [command], 
                shell = True, 
                stdin = None, stdout = None, stderr = None, close_fds = True)

        (result, error) = p.communicate()

    except subprocess.CalledProcessError as e:
        sys.stderr.write(
            "common::run_command() : [ERROR]: output = %s, error code = %s\n" 
            % (e.output, e.returncode))

    return result
process.py 文件源码 项目:stack-updater 作者: allatrack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def call(*popenargs, **kwargs):
        """Run command with arguments. Wait for command to complete. Sends
        output to logging module. The arguments are the same as for the Popen
        constructor."""

        from subprocess import Popen, PIPE

        kwargs['stdout'] = PIPE
        kwargs['stderr'] = PIPE

        p = Popen(*popenargs, **kwargs)
        stdout, stderr = p.communicate()

        if stdout:
            for line in stdout.strip().split("\n"):
                logger.info(line)

        if stderr:
            for line in stderr.strip().split("\n"):
                logger.error(line)

        return p.returncode
qqmathbook.py 文件源码 项目:qqmbr 作者: ischurov 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def mathjax(s):
    with open("temp.log", "w") as f:
        f.write(s)

    p = Popen([app.config['mjpage'],
              '--dollars',
               '--output', "CommonHTML",
               '--fontURL',
               ("https://cdnjs.cloudflare.com/ajax/libs/"
                "mathjax/2.7.0/fonts/HTML-CSS")], stdout=PIPE, stdin=PIPE,
              stderr=PIPE)

    #filename = hashlib.sha256(s.encode('utf-8')).hexdigest()
    #with open(filename, 'w') as f:
    #    print(s, file=f)

    res = p.communicate(input=s.encode('utf-8'))
    out = res[0].decode('utf-8')
    err = res[1].decode('utf-8')

    soup = BeautifulSoup(out, 'html.parser')
    style = str(soup.style)
    body = "".join(str(s) for s in soup.body.children)

    return style, body
pbimport.py 文件源码 项目:protofuzz 作者: trailofbits 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _compile_proto(full_path, dest):
    'Helper to compile protobuf files'
    proto_path = os.path.dirname(full_path)
    protoc_args = [find_protoc(),
                   '--python_out={}'.format(dest),
                   '--proto_path={}'.format(proto_path),
                   full_path]
    proc = subprocess.Popen(protoc_args, stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE)
    try:
        outs, errs = proc.communicate(timeout=5)
    except subprocess.TimeoutExpired:
        proc.kill()
        outs, errs = proc.communicate()
        return False

    if proc.returncode != 0:
        msg = 'Failed compiling "{}": \n\nstderr: {}\nstdout: {}'.format(
            full_path, errs.decode('utf-8'), outs.decode('utf-8'))
        raise BadProtobuf(msg)

    return True
sensor21-server.py 文件源码 项目:sensor21 作者: 21dotco 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def run(daemon):
        if daemon:
            pid_file = './sensor21.pid'
            if os.path.isfile(pid_file):
                pid = int(open(pid_file).read())
                os.remove(pid_file)
                try:
                    p = psutil.Process(pid)
                    p.terminate()
                except:
                    pass
            try:
                p = subprocess.Popen(['python3', 'sensor21-server.py'])
                open(pid_file, 'w').write(str(p.pid))
            except subprocess.CalledProcessError:
                raise ValueError("error starting sensor21-server.py daemon")
        else:
            print("Server running...")
            app.run(host='::', port=5002)
vrmilter.py 文件源码 项目:sipxecs-voicemail-transcription 作者: andrewsauder 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def exec_cmd(cmd, **kwds):
    """
    Execute arbitrary commands as sub-processes.
    """
    stdin = kwds.get('stdin', None)
    stdin_flag = None
    if not stdin is None:
        stdin_flag = subprocess.PIPE
    proc = subprocess.Popen(
        cmd,
        stdin=stdin_flag,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE)
    stdout, stderr = proc.communicate(stdin)
    return (proc.returncode, stdout, stderr)


#=======================================================================
# Extend the Milter Class (where the email is captured)
#=======================================================================
utils.py 文件源码 项目:epubcheck 作者: titusz 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def epubcheck_help():
    """Return epubcheck.jar commandline help text.

    :return unicode: helptext from epubcheck.jar
    """

    # tc = locale.getdefaultlocale()[1]

    with open(os.devnull, "w") as devnull:
        p = subprocess.Popen(
            [c.JAVA, '-Duser.language=en', '-jar', c.EPUBCHECK, '-h'],
            stdout=subprocess.PIPE,
            stderr=devnull,
        )
        result = p.communicate()[0]

    return result.decode()
index.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_verify_command(self, signature_filename, data_filename,
                           keystore=None):
        """
        Return a suitable command for verifying a file.

        :param signature_filename: The pathname to the file containing the
                                   signature.
        :param data_filename: The pathname to the file containing the
                              signed data.
        :param keystore: The path to a directory which contains the keys
                         used in verification. If not specified, the
                         instance's ``gpg_home`` attribute is used instead.
        :return: The verifying command as a list suitable to be
                 passed to :class:`subprocess.Popen`.
        """
        cmd = [self.gpg, '--status-fd', '2', '--no-tty']
        if keystore is None:
            keystore = self.gpg_home
        if keystore:
            cmd.extend(['--homedir', keystore])
        cmd.extend(['--verify', signature_filename, data_filename])
        logger.debug('invoking: %s', ' '.join(cmd))
        return cmd
install_venv_common.py 文件源码 项目:networking-huawei 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run_command_with_code(self, cmd, redirect_output=True,
                              check_exit_code=True):
        """Runs a command in an out-of-process shell.

        Returns the output of that command. Working directory is self.root.
        """
        if redirect_output:
            stdout = subprocess.PIPE
        else:
            stdout = None

        proc = subprocess.Popen(cmd, cwd=self.root, stdout=stdout)
        output = proc.communicate()[0]
        if check_exit_code and proc.returncode != 0:
            self.die('Command "%s" failed.\n%s', ' '.join(cmd), output)
        return (output, proc.returncode)
test_13_TestSB.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_loadSADFile_startorder(self):
        maxpid=32768
        try:
            out=subprocess.Popen(['cat', '/proc/sys/kernel/pid_max'], stdout=subprocess.PIPE)
            res=out.communicate()
            maxpid=int(res[0].strip())
        except:
            pass
        retval = sb.loadSADFile('sdr/dom/waveforms/ticket_462_w/ticket_462_w.sad.xml')
        self.assertEquals(retval, True)
        comp_ac = sb.getComponent('ticket_462_ac_1')
        self.assertNotEquals(comp_ac, None)
        comp = sb.getComponent('ticket_462_1')
        self.assertNotEquals(comp, None)
        if comp_ac._pid <= maxpid-1:
            isless= comp_ac._pid < comp._pid
        else:
            isless=comp._pid < comp_ac._pid 
        self.assertTrue(isless)
test_01_nodeBooter.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_UserOrGroupNoDaemon(self):
        """Test that we read the correct domainname from the DMD file, the test domain
        should have been created by the test runner"""
        domainName = scatest.getTestDomainName()
        # Test that we don't already have a bound domain
        try:
            domMgr = self._root.resolve(scatest.getDomainMgrURI())
            self.assertEqual(domMgr, None)
        except CosNaming.NamingContext.NotFound:
            pass # This exception is expected

        args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user','domuser','--group','somegroup' ]
        nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
        self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)

        args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--group','somegroup' ]
        nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
        self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)

        args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user','domuser' ]
        nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
        self.assertNotEqual(nb.stderr.read().find('If either group or user are specified, daemon must be set'),-1)
test_01_nodeBooter.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_BadUserOrBadGroup(self):
        """Test that we read the correct domainname from the DMD file, the test domain
        should have been created by the test runner"""
        domainName = scatest.getTestDomainName()
        # Test that we don't already have a bound domain
        try:
            domMgr = self._root.resolve(scatest.getDomainMgrURI())
            self.assertEqual(domMgr, None)
        except CosNaming.NamingContext.NotFound:
            pass # This exception is expected

        args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--user=domuser']
        nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
        self.assertNotEqual(nb.stderr.read().find('Separator must be a space'),-1)

        args = ["../../control/framework/nodeBooter","-D","-debug", "9","--nopersist",'--group=somegroup']
        nb = Popen( args, cwd=scatest.getSdrPath(), stderr=PIPE, stdout=PIPE)
        self.assertNotEqual(nb.stderr.read().find('Separator must be a space'),-1)
io_helpers.py 文件源码 项目:core-framework 作者: RedhawkSDR 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def plot(self):
        if _domainless._DEBUG == True:
            print "Plot:plot()"

        # Error checking before launching plot
        if self.usesPortIORString == None:
            raise AssertionError, "Plot:plot() ERROR - usesPortIORString not set ... must call connect() on this object from another component"
        if self._usesPortName == None:
            raise AssertionError, "Plot:plot() ERROR - usesPortName not set ... must call connect() on this object from another component"
        if self._dataType == None:
            raise AssertionError, "Plot:plot() ERROR - dataType not set ... must call connect() on this object from another component"

        plotCommand = str(self._eclipsePath) + "/bin/plotter.sh -portname " + str(self._usesPortName) + " -repid " + str(self._dataType) + " -ior " + str(self.usesPortIORString)
        if _domainless._DEBUG == True:
            print "Plot:plotCommand " + str(plotCommand)
        args = _shlex.split(plotCommand)
        if _domainless._DEBUG == True:
            print "Plot:args " + str(args)
        try:
            dev_null = open('/dev/null','w')
            sub_process = _subprocess.Popen(args,stdout=dev_null,preexec_fn=_os.setpgrp)
            pid = sub_process.pid
            self._processes[pid] = sub_process
        except Exception, e:
            raise AssertionError, "Plot:plot() Failed to launch plotting due to %s" % ( e)
render.py 文件源码 项目:eClaire 作者: kogan 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def print_card(pdf, printer_name):
    """
    Send the PDF to the printer!

    Shells out to `lpr` to do the work.

    :param pdf: Binary PDF buffer
    :param printer_name: Name of the printer on the system (ie. CUPS name)
    """
    process = subprocess.Popen(
        ['lpr', '-P', printer_name],
        stdin=subprocess.PIPE
    )

    process.communicate(pdf)

    if process.returncode != 0:
        raise PrintingError('Return code {}'.format(process.returncode))
graphclust.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def pipe_weighted_edgelist_to_convert(matrix, bin_filename, weight_filename):
    """ Pipe a weighted edgelist (COO sparse matrix) to Louvain's convert utility """
    raise ValueError('Unsupported method at the moment')

    devnull = open(os.devnull, 'w')

    proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
                           '-i', '/dev/stdin',
                           '-o', bin_filename,
                           '-w', weight_filename,
                         ], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)

    # Stream text triplets to 'convert'
    for ijx in itertools.izip(matrix.row, matrix.col, matrix.data):
        proc.stdin.write('%d\t%d\t%f\n' % ijx)

    proc.stdin.close()
    proc.wait()
    devnull.close()
graphclust.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def pipe_unweighted_edgelist_to_convert(matrix, bin_filename):
    """ Pipe an unweighted edgelist (COO sparse matrix) to Louvain's convert utility """
    devnull = open(os.devnull, 'w')

    proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
                           '-i', '/dev/stdin',
                           '-o', bin_filename,
                         ], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)

    # Stream text triplets to 'convert'
    for ij in itertools.izip(matrix.row, matrix.col):
        proc.stdin.write('%d\t%d\n' % ij)

    proc.stdin.close()
    proc.wait()
    devnull.close()
__init__.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def read_fastq(self):
        if self.file[-2:] == "gz":
            proc = subprocess.Popen(["gunzip", "--stdout", self.file], stdout=subprocess.PIPE)
            reader = proc.stdout
        else:
            reader = file(self.file, "r")

        while True:
            header = reader.next().strip()
            seq = reader.next().strip()
            reader.next() # incr line
            qual = reader.next().strip()

            if self.rc:
                seq = tk_seq.get_rev_comp(seq)
                qual = qual[::-1]

            yield FastqRow(header, seq, qual)

        reader.close()
acbs_parser.py 文件源码 项目:acbs 作者: AOSC-Dev 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def parse_ab3_defines(defines_file):  # , pkg_name):
    try:
        fp = open(defines_file, 'rt')
        abd_cont = fp.read()
        fp.close()
    except:
        print('[E] Failed to load autobuild defines file! Do you have read permission?')
        return False
    script = "ARCH={}\n".format(
        get_arch_name()) + abd_cont + gen_laundry_list(['PKGNAME', 'PKGDEP', 'BUILDDEP'])
    try:
        # Better to be replaced by subprocess.Popen
        abd_out = subprocess.check_output(script, shell=True)
    except:
        print('[E] Malformed Autobuild defines file found! Couldn\'t continue!')
        return False
    abd_fp = io.StringIO('[wrap]\n' + abd_out.decode('utf-8'))
    abd_config = RawConfigParser()
    abd_config.read_file(abd_fp)
    abd_config_dict = {}
    for i in abd_config['wrap']:
        abd_config_dict[i.upper()] = abd_config['wrap'][i]
    return abd_config_dict
train.py 文件源码 项目:dl-classification 作者: matthieuo 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def manage_test_process(queue, f, cpopen):
    if queue:
        if cpopen is not None:
            if cpopen.poll() is None:
                return cpopen  # not finish

        print("OK create new process")

        my_env = os.environ.copy()
        my_env["CUDA_VISIBLE_DEVICES"] = ""  # test  on CPU

        argument = queue.pop()
        popen = subprocess.Popen(argument, stdout=f, stderr=f, env=my_env)
        return popen
    else:
        return cpopen
trader.py 文件源码 项目:BitBot 作者: crack00r 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def runitt():
    open(pid, 'w').close()
    global variable
    variable=str(variable)
    variablestr=str(variable)
    print('Starting Trade of: ' + variablestr)
    process0='./zenbot.sh trade poloniex.' + variablestr
    subprocess.Popen(process0,shell=True)
    time.sleep(3600)
    print('Starting node kill process)
    process1='sudo killall node'
    subprocess.Popen(process1,shell=True)
    print('Starting final sell Of:' + variablestr + ' In Case of Error')
    process2='./zenbot.sh sell --order_adjust_time=1000000000 --sell_pct=100 --markup_pct=0  poloniex.' + variablestr
    proc2 = subprocess.Popen(process2,shell=True)
    proc2.communicate()
    print('Starting final sell Of:' + variablestr + ' In Case of Error')
    process3='./zenbot.sh sell --order_adjust_time=1000000000 --sell_pct=100 --markup_pct=0  poloniex.' + variablestr
    proc3 = subprocess.Popen(process3,shell=True)
    proc3.communicate()
    os.remove(pid)
    print('Done running loop, process file deleted. Waiting for another coin...')
# From now on, any update received will be passed to 'update_handler' NOTE... Later, Zenbot will be modified to cancel on order adjust.
sync.py 文件源码 项目:django-powerpages 作者: Open-E-WEB 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def add_to_vcs(self, summary):
        if (
            self.git_add and
            (SyncStatus.DELETED in summary or SyncStatus.ADDED in summary) and
            not self.dry_run and
            self.confirm(
                question=(
                    'Do you want to add created and removed files to GIT?'
                )
            )
        ):
            output, errors = subprocess.Popen(
                ['git', '-C', app_settings.SYNC_DIRECTORY,
                 'add', '-A', app_settings.SYNC_DIRECTORY],
                stdout=subprocess.PIPE, stderr=subprocess.PIPE
            ).communicate()
            if errors:
                raise self.error('Adding file changes to GIT failed!')
conv2mp4-server.py 文件源码 项目:conv2mp4-py 作者: Kameecoding 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_audio_streams(self):
        with open(os.devnull, 'w') as DEV_NULL:
            #Get file info and Parse it
            try:
                proc = subprocess.Popen([
                    FFPROBE,
                    '-i', self.input_video,
                    '-of', 'json',
                    '-show_streams'
                ], stdout=subprocess.PIPE, stderr=DEV_NULL)
            except OSError as e:
                if e.errno == os.errno.ENOENT:
                    Logger.error("FFPROBE not found, install on your system to use this script")
                    sys.exit(0)
            output = proc.stdout.read()

            return get_audio_streams(json.loads(output))
launcher.py 文件源码 项目:wpw-sdk-python 作者: WPTechInnovation 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def launch(self, cfg, path, flags):
        logging.debug("Determine the OS and Architecture this application is currently running on")
        hostOS = platform.system().lower()
        logging.debug("hostOS: " + str(hostOS))
        is_64bits = sys.maxsize > 2 ** 32
        if is_64bits:
            hostArchitecture = 'x64'
        else:
            hostArchitecture = 'ia32'
        logging.debug("hostArchitecture: " + str(hostArchitecture))
        if(self.validateConfig(cfg, hostOS, hostArchitecture)):
            fnull = open(os.devnull, 'w')
            if os.environ.get("WPW_HOME") is not None:
                cmd = [os.environ["WPW_HOME"] + '/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
            else:
                cmd = [path + '/wpwithinpy/iot-core-component/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
            cmd.extend(flags)
            proc = subprocess.Popen(cmd, stdin=None, stdout=fnull, stderr=subprocess.STDOUT)
            return proc
        else:
            logging.debug("Invalid OS/Architecture combination detected")
build_docs.py 文件源码 项目:wiki-to-doc 作者: serra 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def build_mkdocs(self):
        """
        Invokes MkDocs to build the static documentation and moves the folder
        into the project root folder.
        """
        # Setting the working directory
        os.chdir(MKDOCS_DIR)

        # Building the MkDocs project
        pipe = subprocess.PIPE
        mkdocs_process = subprocess.Popen(
            ["mkdocs", "build", "-q"], stdout=pipe, stderr=pipe)
        std_op, std_err_op = mkdocs_process.communicate()

        if std_err_op:
            raise Error("Could not build MkDocs !\n%s" %
                        std_err_op)
goglib_get_games_list.py 文件源码 项目:games_nebula 作者: yancharkin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def goglib_get_games_list():

    proc = subprocess.Popen(['lgogdownloader', '--exclude', \
    '1,2,4,8,16,32', '--list-details'],stdout=subprocess.PIPE)
    games_detailed_list = proc.stdout.readlines()
    stdoutdata, stderrdata = proc.communicate()

    if proc.returncode == 0:

        file_path = os.getenv('HOME') + '/.games_nebula/config/games_list'

        games_list_file = open(file_path, 'w')

        for line in games_detailed_list:
            if 'Getting game info' not in line:
                games_list_file.write(line)

        return 0

    else:

        return 1
launcher_wine.py 文件源码 项目:games_nebula 作者: yancharkin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def win64_available(self):

        wine_bin, \
        wineserver_bin, \
        wine_lib = self.get_wine_bin_path()

        dev_null = open(os.devnull, 'w')
        try:
            proc = subprocess.Popen([wine_bin + '64'], stdout=dev_null, \
                stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
            dev_null.close()
            stdoutdata, stderrdata = proc.communicate()
            if proc.returncode == 1:
                self.combobox_winearch.set_visible(True)
                return True
            else:
                self.combobox_winearch.set_visible(False)
                self.winearch = 'win32'
                return False
        except:
            self.combobox_winearch.set_visible(False)
            self.winearch = 'win32'
            return False
csvrotator.py 文件源码 项目:rpi-can-logger 作者: JonnoFTW 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def writerow(self, row):
        """

        :param row:
        :return:
        """
        self._bytes_written += self._out_writer.writerow(row)
        row_txt = self._buffer.getvalue()
        self._out_csv.write(row_txt)
        self._reset_buffer()
        self._out_csv.flush()
        if self._bytes_written > self.max_bytes:
            self._out_csv.close()
            self._make_csv_writer()
            out_name = str(Path(self._out_csv.name).absolute())
            subprocess.Popen(['7z', 'a', '-t7z', '-m0=lzma', '-mx=9', '-mfb=64', '-md=16m',
                              out_name + '.7z', out_name])

        return row_txt


问题


面经


文章

微信
公众号

扫码关注公众号