_test_multiprocessing.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:ouroboros 作者: pybee 项目源码 文件源码
def test_closefd(self):
        if not HAS_REDUCTION:
            raise unittest.SkipTest('requires fd pickling')

        reader, writer = multiprocessing.Pipe()
        fd = self.get_high_socket_fd()
        try:
            p = multiprocessing.Process(target=self._test_closefds,
                                        args=(writer, fd))
            p.start()
            writer.close()
            e = reader.recv()
            p.join(timeout=5)
        finally:
            self.close(fd)
            writer.close()
            reader.close()

        if multiprocessing.get_start_method() == 'fork':
            self.assertIs(e, None)
        else:
            WSAENOTSOCK = 10038
            self.assertIsInstance(e, OSError)
            self.assertTrue(e.errno == errno.EBADF or
                            e.winerror == WSAENOTSOCK, e)

#
# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
#
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号