def test_closefd(self):
if not HAS_REDUCTION:
raise unittest.SkipTest('requires fd pickling')
reader, writer = multiprocessing.Pipe()
fd = self.get_high_socket_fd()
try:
p = multiprocessing.Process(target=self._test_closefds,
args=(writer, fd))
p.start()
writer.close()
e = reader.recv()
p.join(timeout=5)
finally:
self.close(fd)
writer.close()
reader.close()
if multiprocessing.get_start_method() == 'fork':
self.assertIs(e, None)
else:
WSAENOTSOCK = 10038
self.assertIsInstance(e, OSError)
self.assertTrue(e.errno == errno.EBADF or
e.winerror == WSAENOTSOCK, e)
#
# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
#
评论列表
文章目录