tests.py 文件源码

python
阅读 29 收藏 0 点赞 0 评论 0

项目:instanotifier 作者: AlexanderKaluzhny 项目源码 文件源码
def test_consume_feed():
    """ Consume test rss feed through connected fetcher and parser tasks.
        And make sure the parser have created the RssNotification instances.
        It should be run from under the shell/script.
    """

    from celery import chain
    from instanotifier.fetcher.tasks import fetch
    from instanotifier.fetcher.rss.utils import _rss_file_path

    original_notification_count = RssNotification.objects.count()
    print 'Original notifications count: %s' % (original_notification_count)

    task_flow = chain(fetch.s(_rss_file_path()), tasks.parse.s())
    saved_pks = task_flow.delay().get()

    actual_notification_count = RssNotification.objects.count()
    print 'Actual notifications count: %s' % (actual_notification_count)

    print 'Number of saved notifications: %s' % (len(saved_pks))
    assert (len(saved_pks) == actual_notification_count - original_notification_count)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号