def test_wait_for_write(self):
event = multiprocessing.Event()
wait_count = 0
with self.lock.for_read():
def test():
with self.lock.for_write():
self.assert_writer()
event.set()
return 'written'
writer = self.async(test)
while not event.is_set():
self.assert_readers(1)
wait_count += 1
self.lock.wait_for_write()
self.assert_readers(1)
self.assertEqual('written', self.get_result(writer))
self.assert_unlocked()
self.assertLessEqual(wait_count, 2)
评论列表
文章目录