def test_fd_transfer(self):
if self.TYPE != 'processes':
self.skipTest("only makes sense with processes")
conn, child_conn = self.Pipe(duplex=True)
p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
p.daemon = True
p.start()
with open(test_support.TESTFN, "wb") as f:
fd = f.fileno()
if msvcrt:
fd = msvcrt.get_osfhandle(fd)
reduction.send_handle(conn, fd, p.pid)
p.join()
with open(test_support.TESTFN, "rb") as f:
self.assertEqual(f.read(), b"foo")
test_multiprocessing.py 文件源码
python
阅读 25
收藏 0
点赞 0
评论 0
评论列表
文章目录