def finish(self, timeout=None):
"""
Cancel all pending tasks and optionally re-enqueue jobs which haven't finished after the timeout.
:param timeout: how long to wait for tasks to finish, defaults to ``shutdown_delay``
"""
timeout = timeout or self.shutdown_delay
self.running = False
cancelled_tasks = 0
if self.pending_tasks:
with await self._finish_lock:
work_logger.info('drain waiting %0.1fs for %d tasks to finish', timeout, len(self.pending_tasks))
_, pending = await asyncio.wait(self.pending_tasks, timeout=timeout, loop=self.loop)
if pending:
pipe = self.redis.pipeline()
for task in pending:
if task.re_enqueue:
pipe.rpush(task.job.raw_queue, task.job.raw_data)
task.cancel()
cancelled_tasks += 1
if pipe._results:
await pipe.execute()
self.pending_tasks = set()
return cancelled_tasks
评论列表
文章目录