def test_aggregation_without_events(app, es_with_templates):
"""Check that the aggregation doesn't crash if there are no events.
This scenario happens when celery starts aggregating but no events
have been created yet.
"""
# Aggregate events
StatAggregator(event='file-download',
aggregation_field='file_id',
aggregation_interval='day',
query_modifiers=[]).run()
assert not Index(
'stats-file-download', using=current_search_client
).exists()
# Create the index but without any event. This happens when the events
# have been indexed but are not yet searchable (before index refresh).
Index('events-stats-file-download-2017',
using=current_search_client).create()
# Wait for the index to be available
time.sleep(1)
# Aggregate events
StatAggregator(event='file-download',
aggregation_field='file_id',
aggregation_interval='day',
query_modifiers=[]).run()
assert not Index(
'stats-file-download', using=current_search_client
).exists()
test_aggregations.py 文件源码
python
阅读 22
收藏 0
点赞 0
评论 0
评论列表
文章目录