queue_test.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:deb-python-eventlet 作者: openstack 项目源码 文件源码
def test_channel_waiters(self):
        c = eventlet.Queue(0)
        w1 = eventlet.spawn(c.get)
        w2 = eventlet.spawn(c.get)
        w3 = eventlet.spawn(c.get)
        eventlet.sleep(0)
        self.assertEqual(c.getting(), 3)
        s1 = eventlet.spawn(c.put, 1)
        s2 = eventlet.spawn(c.put, 2)
        s3 = eventlet.spawn(c.put, 3)

        s1.wait()
        s2.wait()
        s3.wait()
        self.assertEqual(c.getting(), 0)
        # NOTE: we don't guarantee that waiters are served in order
        results = sorted([w1.wait(), w2.wait(), w3.wait()])
        self.assertEqual(results, [1, 2, 3])
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号