def test_reader_blocks_writer(self):
with self.lock.for_read():
before_write = multiprocessing.Barrier(2)
during_write = multiprocessing.Barrier(2)
after_write = multiprocessing.Barrier(2)
after_unlock = multiprocessing.Barrier(2)
def test():
self.assert_readers(1)
before_write.wait()
with self.lock.for_write():
self.assert_writer()
return 'written'
writer = self.async(test)
# Wait until we can confirm that all writers are locked out.
before_write.wait()
self.assert_readers(1)
self.assertEqual('written', self.get_result(writer))
self.assert_unlocked()
评论列表
文章目录