test_ringbuffer.py 文件源码

python
阅读 31 收藏 0 点赞 0 评论 0

项目:ringbuffer 作者: bslatkin 项目源码 文件源码
def test_multiple_readers_block_writer(self):
        with self.lock.for_read():
            before_read = multiprocessing.Barrier(3)
            after_read = multiprocessing.Barrier(2)

            def test_reader():
                self.assert_readers(1)

                with self.lock.for_read():
                    before_read.wait()
                    value = self.reader_count()
                    after_read.wait()
                    return value

            def test_writer():
                before_read.wait()

                with self.lock.for_write():
                    self.assert_writer()
                    return 'written'

            reader = self.async(test_reader)
            writer = self.async(test_writer)

            # Wait for the write to be blocked by multiple readers.
            before_read.wait()
            self.assert_readers(2)
            after_read.wait()

        self.assertEqual(2, self.get_result(reader))
        self.assertEqual('written', self.get_result(writer))
        self.assert_unlocked()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号