def test_sensu_process_called_once_inside_window(
self,
create_message,
registered_schema,
producer,
message_count
):
with mock.patch.object(
data_pipeline.tools.sensu_ttl_alerter.SensuTTLAlerter,
'process',
autospec=True,
return_value=None
) as mock_sensu_ttl_process:
producer.enable_sensu = True
m1 = create_message(registered_schema, timeslot=1.0)
for i in range(message_count):
producer.publish(m1)
assert mock_sensu_ttl_process.call_count == 1
评论列表
文章目录