def test_consume_feed_task_chaining():
""" Consume test rss feed through connected fetcher and parser and publisher tasks.
Make sure the parser have created the RssNotification instances, and
all the messages were sent.
It should be run from under the shell/script.
"""
from celery import chain
from instanotifier.fetcher.tasks import fetch
from instanotifier.parser.tasks import parse
from instanotifier.fetcher.rss.utils import _rss_file_path
with TestFeedSourceAutoCleanupContext() as context:
original_notification_count = RssNotification.objects.count()
print 'Original notifications count: %s' % (original_notification_count)
task_flow = chain(fetch.s(_rss_file_path()), parse.s(), publish.s(context.feedsource_pk))
task_flow.delay().get()
actual_notification_count = RssNotification.objects.count()
print 'Actual notifications count: %s' % (actual_notification_count)
assert (actual_notification_count > original_notification_count)
评论列表
文章目录