def test_consume_feed():
""" Consume test rss feed through connected fetcher and parser tasks.
And make sure the parser have created the RssNotification instances.
It should be run from under the shell/script.
"""
from celery import chain
from instanotifier.fetcher.tasks import fetch
from instanotifier.fetcher.rss.utils import _rss_file_path
original_notification_count = RssNotification.objects.count()
print 'Original notifications count: %s' % (original_notification_count)
task_flow = chain(fetch.s(_rss_file_path()), tasks.parse.s())
saved_pks = task_flow.delay().get()
actual_notification_count = RssNotification.objects.count()
print 'Actual notifications count: %s' % (actual_notification_count)
print 'Number of saved notifications: %s' % (len(saved_pks))
assert (len(saved_pks) == actual_notification_count - original_notification_count)
评论列表
文章目录