def test_job_no_result(self):
"""Execute a Git backend job that will not produce any results"""
args = {
'uri': 'http://example.com/',
'gitpath': os.path.join(self.dir, 'data/git_log_empty.txt'),
'from_date': datetime.datetime(2020, 1, 1, 1, 1, 1)
}
q = rq.Queue('queue', async=False)
job = q.enqueue(execute_perceval_job,
backend='git', backend_args=args,
qitems='items', task_id='mytask')
result = job.return_value
self.assertEqual(result.job_id, job.get_id())
self.assertEqual(result.task_id, 'mytask')
self.assertEqual(result.backend, 'git')
self.assertEqual(result.last_uuid, None)
self.assertEqual(result.max_date, None)
self.assertEqual(result.nitems, 0)
self.assertEqual(result.offset, None)
self.assertEqual(result.nresumed, 0)
commits = self.conn.lrange('items', 0, -1)
commits = [pickle.loads(c) for c in commits]
self.assertListEqual(commits, [])
评论列表
文章目录