tests.py 文件源码

python
阅读 29 收藏 0 点赞 0 评论 0

项目:instanotifier 作者: AlexanderKaluzhny 项目源码 文件源码
def test_consume_feed_task_chaining():
    """ Consume test rss feed through connected fetcher and parser and publisher tasks.
        Make sure the parser have created the RssNotification instances, and
        all the messages were sent.

        It should be run from under the shell/script.
    """

    from celery import chain
    from instanotifier.fetcher.tasks import fetch
    from instanotifier.parser.tasks import parse
    from instanotifier.fetcher.rss.utils import _rss_file_path

    with TestFeedSourceAutoCleanupContext() as context:
        original_notification_count = RssNotification.objects.count()
        print 'Original notifications count: %s' % (original_notification_count)

        task_flow = chain(fetch.s(_rss_file_path()), parse.s(), publish.s(context.feedsource_pk))
        task_flow.delay().get()

        actual_notification_count = RssNotification.objects.count()
        print 'Actual notifications count: %s' % (actual_notification_count)

        assert (actual_notification_count > original_notification_count)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号