test_subprocess.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:ouroboros 作者: pybee 项目源码 文件源码
def test_close_fds(self):
        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")

        fds = os.pipe()
        self.addCleanup(os.close, fds[0])
        self.addCleanup(os.close, fds[1])

        open_fds = set(fds)
        # add a bunch more fds
        for _ in range(9):
            fd = os.open(os.devnull, os.O_RDONLY)
            self.addCleanup(os.close, fd)
            open_fds.add(fd)

        for fd in open_fds:
            os.set_inheritable(fd, True)

        p = subprocess.Popen([sys.executable, fd_status],
                             stdout=subprocess.PIPE, close_fds=False)
        output, ignored = p.communicate()
        remaining_fds = set(map(int, output.split(b',')))

        self.assertEqual(remaining_fds & open_fds, open_fds,
                         "Some fds were closed")

        p = subprocess.Popen([sys.executable, fd_status],
                             stdout=subprocess.PIPE, close_fds=True)
        output, ignored = p.communicate()
        remaining_fds = set(map(int, output.split(b',')))

        self.assertFalse(remaining_fds & open_fds,
                         "Some fds were left open")
        self.assertIn(1, remaining_fds, "Subprocess failed")

        # Keep some of the fd's we opened open in the subprocess.
        # This tests _posixsubprocess.c's proper handling of fds_to_keep.
        fds_to_keep = set(open_fds.pop() for _ in range(8))
        p = subprocess.Popen([sys.executable, fd_status],
                             stdout=subprocess.PIPE, close_fds=True,
                             pass_fds=())
        output, ignored = p.communicate()
        remaining_fds = set(map(int, output.split(b',')))

        self.assertFalse(remaining_fds & fds_to_keep & open_fds,
                         "Some fds not in pass_fds were left open")
        self.assertIn(1, remaining_fds, "Subprocess failed")
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号