pools_test.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:deb-python-eventlet 作者: openstack 项目源码 文件源码
def test_putting_to_queue(self):
        timer = eventlet.Timeout(0.1)
        try:
            size = 2
            self.pool = IntPool(min_size=0, max_size=size)
            queue = Queue()
            results = []

            def just_put(pool_item, index):
                self.pool.put(pool_item)
                queue.put(index)
            for index in six.moves.range(size + 1):
                pool_item = self.pool.get()
                eventlet.spawn(just_put, pool_item, index)

            for _ in six.moves.range(size + 1):
                x = queue.get()
                results.append(x)
            self.assertEqual(sorted(results), list(six.moves.range(size + 1)))
        finally:
            timer.cancel()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号