pkg5unittest.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:solaris-ips 作者: oracle 项目源码 文件源码
def ptyPopen(args, executable=None, env=None, shell=False):
                """Less featureful but inspired by subprocess.Popen.
                Runs subprocess in a pty"""
                #
                # Note: In theory the right answer here is to subclass Popen,
                # but we found that in practice we'd have to reimplement most
                # of that class, because its handling of file descriptors is
                # too brittle in its _execute_child() code.
                #
                def __drain(masterf, outlist):
                        # Use a list as a way to pass by reference
                        while True:
                                chunksz = 1024
                                termdata = masterf.read(chunksz)
                                outlist.append(termdata)
                                if len(termdata) < chunksz:
                                        # assume we hit EOF
                                        break

                # This is the arg handling protocol from Popen
                if isinstance(args, six.string_types):
                        args = [args]
                else:
                        args = list(args)

                if shell:
                        args = ["/bin/sh", "-c"] + args
                        if executable:
                                args[0] = executable

                if executable is None:
                        executable = args[0]

                pid,fd = pty.fork()
                if pid == 0:
                        try:
                                # Child
                                if env is None:
                                        os.execvp(executable, args)
                                else:
                                        os.execvpe(executable, args, env)
                        except:
                                traceback.print_exc()
                                os._exit(99)
                else:
                        masterf = os.fdopen(fd, "rb")
                        outlist = []
                        t = threading.Thread(target=__drain,
                            args=(masterf, outlist))
                        t.start()
                        waitedpid, retcode = os.waitpid(pid, 0)
                        retcode = retcode >> 8
                        t.join()
                return retcode, b"".join(outlist)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号