def test_wait_timeout(self):
from multiprocessing.connection import wait
expected = 5
a, b = multiprocessing.Pipe()
start = time.time()
res = wait([a, b], expected)
delta = time.time() - start
self.assertEqual(res, [])
self.assertLess(delta, expected * 2)
self.assertGreater(delta, expected * 0.5)
b.send(None)
start = time.time()
res = wait([a, b], 20)
delta = time.time() - start
self.assertEqual(res, [a])
self.assertLess(delta, 0.4)
评论列表
文章目录