test_dispatch.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码
def test_register(self, m_init, m_wrap_consumer):
        consumes = {mock.sentinel.key_fn1: mock.sentinel.key1,
                    mock.sentinel.key_fn2: mock.sentinel.key2,
                    mock.sentinel.key_fn3: mock.sentinel.key3}
        m_dispatcher = mock.Mock()
        m_consumer = mock.Mock()
        m_consumer.consumes = consumes
        m_wrap_consumer.return_value = mock.sentinel.handler
        m_init.return_value = None
        pipeline = _TestEventPipeline()
        pipeline._dispatcher = m_dispatcher

        pipeline.register(m_consumer)

        m_wrap_consumer.assert_called_once_with(m_consumer)
        m_dispatcher.register.assert_has_calls([
            mock.call(key_fn, key, mock.sentinel.handler)
            for key_fn, key in consumes.items()], any_order=True)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号