def test_wait_for_write__writer_already_waiting_for_reader(self):
event = multiprocessing.Event()
with self.lock.for_read():
def test():
event.set()
with self.lock.for_write():
self.assert_writer()
event.set()
return 'written'
writer = self.async(test)
event.wait()
# Force a context switch so the writer is waiting
time.sleep(0.1)
self.lock.wait_for_write()
self.assert_readers(1)
self.assertEqual('written', self.get_result(writer))
self.assert_unlocked()
评论列表
文章目录