test_registry.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:trellio 作者: artificilabs 项目源码 文件源码
def test_xsubscribe(service_a1, service_d1, registry):
    # assert service_d1 == {}
    registry.register_service(
        packet={'params': service_a1}, registry_protocol=mock.Mock())
    registry.register_service(
        packet={'params': service_d1}, registry_protocol=mock.Mock())
    registry._xsubscribe(packet={'params': service_d1})

    protocol = mock.Mock()
    params = {
        'name': service_a1['name'],
        'version': service_a1['version'],
        'endpoint': service_d1['events'][0]['endpoint']
    }
    registry.get_subscribers(packet={'params': params, 'request_id': str(uuid.uuid4())}, protocol=protocol)
    assert subscriber_returned_successfully(protocol.send.call_args_list[0][0][0], service_d1)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号