test_subprocess.py 文件源码

python
阅读 44 收藏 0 点赞 0 评论 0

项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码
def test_pass_fds(self):
        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")

        open_fds = set()

        for x in range(5):
            fds = os.pipe()
            self.addCleanup(os.close, fds[0])
            self.addCleanup(os.close, fds[1])
            os.set_inheritable(fds[0], True)
            os.set_inheritable(fds[1], True)
            open_fds.update(fds)

        for fd in open_fds:
            p = subprocess.Popen([sys.executable, fd_status],
                                 stdout=subprocess.PIPE, close_fds=True,
                                 pass_fds=(fd, ))
            output, ignored = p.communicate()

            remaining_fds = set(map(int, output.split(b',')))
            to_be_closed = open_fds - {fd}

            self.assertIn(fd, remaining_fds, "fd to be passed not passed")
            self.assertFalse(remaining_fds & to_be_closed,
                             "fd to be closed passed")

            # pass_fds overrides close_fds with a warning.
            with self.assertWarns(RuntimeWarning) as context:
                self.assertFalse(subprocess.call(
                        [sys.executable, "-c", "import sys; sys.exit(0)"],
                        close_fds=False, pass_fds=(fd, )))
            self.assertIn('overriding close_fds', str(context.warning))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号