def scheduled_responder(cls,
plugin: str,
interval: (crontab, float),
queue: Enum = TaskQueue.SHORT,
**kwargs):
"""
Registers the decorated function as responder and register
`run_plugin_for_all_repos` as periodic task with plugin name and
a responder event as arguments.
:param plugin: Name of plugin with which responder will be registered.
:param interval: Periodic interval in seconds as float or crontab
object specifying task trigger time.
See http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab
:param queue: Queue to use for the scheduled_responder's tasks.
:param kwargs: Keyword arguments to pass to `run_plugin_for_all_repos`.
>>> from gitmate_hooks.utils import ResponderRegistrar
>>> @ResponderRegistrar.scheduled_responder('test', 10.0)
... def test_responder(igitt_repo):
... print('Hello, World!')
This will register a `test.test_responder` responder and schedule
`run_plugin_for_all_repos` with arguments `('test',
'test.test_responder')` with 10 seconds interval.
"""
def _wrapper(function: Callable):
action = '{}.{}'.format(plugin, function.__name__)
periodic_task_args = (plugin, action)
function = cls.responder(plugin, action)(function)
task = celery.task(run_plugin_for_all_repos,
base=ExceptionLoggerTask,
queue=queue.value)
celery.add_periodic_task(
interval, task.s(), periodic_task_args, kwargs)
return function
return _wrapper
评论列表
文章目录