def scheduler(cls,
interval: (crontab, float),
*args,
queue: Enum = TaskQueue.SHORT,
**kwargs): # pragma: no cover
"""
Registers the decorated function as a periodic task. The task should
not accept any arguments.
:param interval: Periodic interval in seconds as float or crontab
object specifying task trigger time. See
http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab
:param queue: Queue to use for the scheduled task.
:param args: Arguments to pass to scheduled task.
:param kwargs: Keyword arguments to pass to scheduled task.
"""
def _wrapper(function: Callable):
task = celery.task(function,
base=ExceptionLoggerTask,
queue=queue.value)
celery.add_periodic_task(interval, task.s(), args, kwargs)
return function
return _wrapper
评论列表
文章目录