test_lock.py 文件源码

python
阅读 33 收藏 0 点赞 0 评论 0

项目:deb-python-fasteners 作者: openstack 项目源码 文件源码
def test_double_reader_abort(self):
        lock = fasteners.ReaderWriterLock()
        activated = collections.deque()

        def double_bad_reader():
            with lock.read_lock():
                with lock.read_lock():
                    raise RuntimeError("Broken")

        def happy_writer():
            with lock.write_lock():
                activated.append(lock.owner)

        with futures.ThreadPoolExecutor(max_workers=20) as e:
            for i in range(0, 20):
                if i % 2 == 0:
                    e.submit(double_bad_reader)
                else:
                    e.submit(happy_writer)

        self.assertEqual(10, len([a for a in activated if a == 'w']))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号