drain.py 文件源码

python
阅读 26 收藏 0 点赞 0 评论 0

项目:arq 作者: samuelcolvin 项目源码 文件源码
def finish(self, timeout=None):
        """
        Cancel all pending tasks and optionally re-enqueue jobs which haven't finished after the timeout.

        :param timeout: how long to wait for tasks to finish, defaults to ``shutdown_delay``
        """
        timeout = timeout or self.shutdown_delay
        self.running = False
        cancelled_tasks = 0
        if self.pending_tasks:
            with await self._finish_lock:
                work_logger.info('drain waiting %0.1fs for %d tasks to finish', timeout, len(self.pending_tasks))
                _, pending = await asyncio.wait(self.pending_tasks, timeout=timeout, loop=self.loop)
                if pending:
                    pipe = self.redis.pipeline()
                    for task in pending:
                        if task.re_enqueue:
                            pipe.rpush(task.job.raw_queue, task.job.raw_data)
                        task.cancel()
                        cancelled_tasks += 1
                    if pipe._results:
                        await pipe.execute()
                self.pending_tasks = set()
        return cancelled_tasks
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号