def test_writer_blocks_reader(self):
with self.lock.for_write():
event = multiprocessing.Event()
def test():
self.assert_writer()
# Caller will block until this event is released.
event.set()
with self.lock.for_read():
self.assert_readers(1)
return 'read'
r = self.async(test)
# Wait until we can confirm that the reader is locked out.
event.wait()
self.assert_writer()
self.assertEqual('read', self.get_result(r))
self.assert_unlocked()
评论列表
文章目录