def test_event(self, container_factory, rabbit_config):
from examples.retry import Service
container = container_factory(Service, rabbit_config)
container.start()
timestamp = arrow.utcnow().replace(seconds=+1)
dispatch = event_dispatcher(rabbit_config)
with entrypoint_waiter(
container, 'handle_event', callback=wait_for_result
) as result:
payload = {'timestamp': timestamp.isoformat()}
dispatch("src_service", "event_type", payload)
res = result.get()
assert arrow.get(re.match("Time is (.+)", res).group(1)) >= timestamp
评论列表
文章目录