def test_writer_blocks_multiple_readers(self):
with self.lock.for_write():
before_read = multiprocessing.Barrier(3)
during_read = multiprocessing.Barrier(2)
after_read = multiprocessing.Barrier(2)
def test():
self.assert_writer()
before_read.wait()
with self.lock.for_read():
during_read.wait()
value = self.reader_count()
after_read.wait()
return value
r1 = self.async(test)
r2 = self.async(test)
# Wait until we can confirm that all readers are locked out
before_read.wait()
self.assert_writer()
self.assertEqual(2, self.get_result(r1))
self.assertEqual(2, self.get_result(r2))
self.assert_unlocked()
评论列表
文章目录