def test_foreign_queues_are_ignored(self):
u'''
Test that foreign RQ-queues are ignored.
'''
# Create queues for this CKAN instance
self.enqueue(queue=u'q1')
self.enqueue(queue=u'q2')
# Create queue for another CKAN instance
with changed_config(u'ckan.site_id', u'some-other-ckan-instance'):
self.enqueue(queue=u'q2')
# Create queue not related to CKAN
rq.Queue(u'q4').enqueue_call(jobs.test_job)
all_queues = jobs.get_all_queues()
names = {jobs.remove_queue_name_prefix(q.name) for q in all_queues}
assert_equal(names, {u'q1', u'q2'})
评论列表
文章目录