utils.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:gitmate-2 作者: GitMateIO 项目源码 文件源码
def scheduler(cls,
                  interval: (crontab, float),
                  *args,
                  queue: Enum = TaskQueue.SHORT,
                  **kwargs):  # pragma: no cover
        """
        Registers the decorated function as a periodic task. The task should
        not accept any arguments.

        :param interval:    Periodic interval in seconds as float or crontab
                            object specifying task trigger time. See
                            http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab
        :param queue:       Queue to use for the scheduled task.
        :param args:        Arguments to pass to scheduled task.
        :param kwargs:      Keyword arguments to pass to scheduled task.
        """
        def _wrapper(function: Callable):
            task = celery.task(function,
                               base=ExceptionLoggerTask,
                               queue=queue.value)
            celery.add_periodic_task(interval, task.s(), args, kwargs)
            return function
        return _wrapper
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号