def test_get_current_job(self):
"""
Ensure that functions using RQ's ``get_current_job`` doesn't fail
when run from rqworker (the job id is not in the failed queue).
"""
queue = get_queue()
job = queue.enqueue(access_self)
call_command('rqworker', '--burst')
failed_queue = Queue(name='failed', connection=queue.connection)
self.assertFalse(job.id in failed_queue.job_ids)
job.delete()
评论列表
文章目录