test_aggregations.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:invenio-stats 作者: inveniosoftware 项目源码 文件源码
def test_aggregation_without_events(app, es_with_templates):
    """Check that the aggregation doesn't crash if there are no events.

    This scenario happens when celery starts aggregating but no events
    have been created yet.
    """
    # Aggregate events
    StatAggregator(event='file-download',
                   aggregation_field='file_id',
                   aggregation_interval='day',
                   query_modifiers=[]).run()
    assert not Index(
        'stats-file-download', using=current_search_client
    ).exists()
    # Create the index but without any event. This happens when the events
    # have been indexed but are not yet searchable (before index refresh).
    Index('events-stats-file-download-2017',
          using=current_search_client).create()
    # Wait for the index to be available
    time.sleep(1)
    # Aggregate events
    StatAggregator(event='file-download',
                   aggregation_field='file_id',
                   aggregation_interval='day',
                   query_modifiers=[]).run()
    assert not Index(
        'stats-file-download', using=current_search_client
    ).exists()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号