test_multiprocessing.py 文件源码

python
阅读 37 收藏 0 点赞 0 评论 0

项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码
def test_large_fd_transfer(self):
        # With fd > 256 (issue #11657)
        if self.TYPE != 'processes':
            self.skipTest("only makes sense with processes")
        conn, child_conn = self.Pipe(duplex=True)

        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
        p.daemon = True
        p.start()
        with open(test_support.TESTFN, "wb") as f:
            fd = f.fileno()
            for newfd in range(256, MAXFD):
                if not self._is_fd_assigned(newfd):
                    break
            else:
                self.fail("could not find an unassigned large file descriptor")
            os.dup2(fd, newfd)
            try:
                reduction.send_handle(conn, newfd, p.pid)
            finally:
                os.close(newfd)
        p.join()
        with open(test_support.TESTFN, "rb") as f:
            self.assertEqual(f.read(), b"bar")
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号