def test_xsubscribe(service_a1, service_d1, registry):
# assert service_d1 == {}
registry.register_service(
packet={'params': service_a1}, registry_protocol=mock.Mock())
registry.register_service(
packet={'params': service_d1}, registry_protocol=mock.Mock())
registry._xsubscribe(packet={'params': service_d1})
protocol = mock.Mock()
params = {
'name': service_a1['name'],
'version': service_a1['version'],
'endpoint': service_d1['events'][0]['endpoint']
}
registry.get_subscribers(packet={'params': params, 'request_id': str(uuid.uuid4())}, protocol=protocol)
assert subscriber_returned_successfully(protocol.send.call_args_list[0][0][0], service_d1)
评论列表
文章目录