test_event_bus.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:voltha 作者: opencord 项目源码 文件源码
def test_deferred_queue_receiver(self):

        ebc = EventBus()

        queue = DeferredQueue()

        ebc.subscribe('', lambda _, msg: queue.put(msg))

        for i in xrange(10):
            ebc.publish('', i)

        self.assertEqual(len(queue.pending), 10)
        for i in xrange(10):
            msg = yield queue.get()
            self.assertEqual(msg, i)
        self.assertEqual(len(queue.pending), 0)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号