def test_event(self):
event = self.Event()
wait = TimingWrapper(event.wait)
# Removed temporarily, due to API shear, this does not
# work with threading._Event objects. is_set == isSet
self.assertEqual(event.is_set(), False)
# Removed, threading.Event.wait() will return the value of the __flag
# instead of None. API Shear with the semaphore backed mp.Event
self.assertEqual(wait(0.0), False)
self.assertTimingAlmostEqual(wait.elapsed, 0.0)
self.assertEqual(wait(TIMEOUT1), False)
self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
event.set()
# See note above on the API differences
self.assertEqual(event.is_set(), True)
self.assertEqual(wait(), True)
self.assertTimingAlmostEqual(wait.elapsed, 0.0)
self.assertEqual(wait(TIMEOUT1), True)
self.assertTimingAlmostEqual(wait.elapsed, 0.0)
# self.assertEqual(event.is_set(), True)
event.clear()
#self.assertEqual(event.is_set(), False)
p = self.Process(target=self._test_event, args=(event,))
p.daemon = True
p.start()
self.assertEqual(wait(), True)
#
#
#
评论列表
文章目录