def test_register_receivers(base_app, event_entrypoints):
"""Test signal-receiving/event-emitting functions registration."""
try:
_signals = Namespace()
my_signal = _signals.signal('my-signal')
def event_builder1(event, sender_app, signal_param, *args, **kwargs):
event.update(dict(event_param1=signal_param))
return event
def event_builder2(event, sender_app, signal_param, *args, **kwargs):
event.update(dict(event_param2=event['event_param1'] + 1))
return event
base_app.config.update(dict(
STATS_EVENTS=dict(
event_0=dict(
signal=my_signal,
event_builders=[event_builder1, event_builder2]
)
)
))
InvenioStats(base_app)
current_queues.declare()
my_signal.send(base_app, signal_param=42)
my_signal.send(base_app, signal_param=42)
events = [event for event in current_stats.consume('event_0')]
# two events should have been created from the sent events. They should
# have been both processed by the two event builders.
assert events == [{'event_param1': 42, 'event_param2': 43}] * 2
finally:
current_queues.delete()