def test_job_caching_not_supported(self):
"""Check if it fails when caching is not supported"""
args = {
'uri': 'http://example.com/',
'gitpath': os.path.join(self.dir, 'data/git_log.txt')
}
q = rq.Queue('queue', async=False)
with self.assertRaises(AttributeError):
job = q.enqueue(execute_perceval_job,
backend='git', backend_args=args,
qitems='items', task_id='mytask',
cache_path=self.tmp_path)
with self.assertRaises(AttributeError):
job = q.enqueue(execute_perceval_job,
backend='git', backend_args=args,
qitems='items', task_id='mytask',
fetch_from_cache=True)
评论列表
文章目录