def test_multiple_readers_block_writer(self):
with self.lock.for_read():
before_read = multiprocessing.Barrier(3)
after_read = multiprocessing.Barrier(2)
def test_reader():
self.assert_readers(1)
with self.lock.for_read():
before_read.wait()
value = self.reader_count()
after_read.wait()
return value
def test_writer():
before_read.wait()
with self.lock.for_write():
self.assert_writer()
return 'written'
reader = self.async(test_reader)
writer = self.async(test_writer)
# Wait for the write to be blocked by multiple readers.
before_read.wait()
self.assert_readers(2)
after_read.wait()
self.assertEqual(2, self.get_result(reader))
self.assertEqual('written', self.get_result(writer))
self.assert_unlocked()
评论列表
文章目录