def __init__(self, *,
redis: Redis,
max_concurrent_tasks: int=50,
shutdown_delay: float=6,
timeout_seconds: int=60,
burst_mode: bool=True,
raise_task_exception: bool=False,
semaphore_timeout: float=60) -> None:
"""
:param redis: redis pool to get connection from to pop items from list, also used to optionally
re-enqueue pending jobs on termination
:param max_concurrent_tasks: maximum number of jobs which can be execute at the same time by the event loop
:param shutdown_delay: number of seconds to wait for tasks to finish
:param timeout_seconds: maximum duration of a job, after that the job will be cancelled by the event loop
:param burst_mode: break the iter loop as soon as no more jobs are available by adding an sentinel quit queue
:param raise_task_exception: whether or not to raise an exception which occurs in a processed task
"""
self.redis = redis
self.loop = redis._pool_or_conn._loop
self.max_concurrent_tasks = max_concurrent_tasks
self.task_semaphore = asyncio.Semaphore(value=max_concurrent_tasks, loop=self.loop)
self.shutdown_delay = max(shutdown_delay, 0.1)
self.timeout_seconds = timeout_seconds
self.burst_mode = burst_mode
self.raise_task_exception = raise_task_exception
self.pending_tasks: Set[asyncio.futures.Future] = set()
self.task_exception: Exception = None
self.semaphore_timeout = semaphore_timeout
self.jobs_complete, self.jobs_failed, self.jobs_timed_out = 0, 0, 0
self.running = False
self._finish_lock = asyncio.Lock(loop=self.loop)
评论列表
文章目录