def scheduler(cls,
interval: (crontab, float),
*args,
queue: Enum = TaskQueue.SHORT,
**kwargs): # pragma: no cover
"""
Registers the decorated function as a periodic task. The task should
not accept any arguments.
:param interval: Periodic interval in seconds as float or crontab
object specifying task trigger time. See
http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab
:param queue: Queue to use for the scheduled task.
:param args: Arguments to pass to scheduled task.
:param kwargs: Keyword arguments to pass to scheduled task.
"""
def _wrapper(function: Callable):
task = celery.task(function,
base=ExceptionLoggerTask,
queue=queue.value)
celery.add_periodic_task(interval, task.s(), args, kwargs)
return function
return _wrapper
python类crontab()的实例源码
def decode_schedule(obj):
if obj is None:
return None
_type = obj['__type__']
value = obj['__value__']
if _type == 'datetime':
return decode_datetime(value)
elif _type == 'crontab':
return crontab(*value.split('\t'))
elif _type == 'solar':
return solar(**value)
elif _type == 'schedule':
return schedule(**value)
else:
raise NotImplementedError(
'Cannot deserialize schedule %(type)s type' % {
'type': _type
}
)
def install_default_entries(self, data):
entries = {}
if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
# Add backend clean up
entries.setdefault(
'celery.backend_cleanup', {
'task': 'celery.backend_cleanup',
'schedule': schedules.crontab('0', '4', '*')
}
)
self.update_from_dict(entries)
def schedule(self):
return schedules.crontab(
minute=self.minute,
hour=self.hour,
day_of_week=self.day_of_week,
day_of_month=self.day_of_month,
month_of_year=self.month_of_year
)
def schedule(self):
if self.crontab:
return self.crontab.schedule
if self.interval:
return self.interval.schedule
def scheduled_responder(cls,
plugin: str,
interval: (crontab, float),
queue: Enum = TaskQueue.SHORT,
**kwargs):
"""
Registers the decorated function as responder and register
`run_plugin_for_all_repos` as periodic task with plugin name and
a responder event as arguments.
:param plugin: Name of plugin with which responder will be registered.
:param interval: Periodic interval in seconds as float or crontab
object specifying task trigger time.
See http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab
:param queue: Queue to use for the scheduled_responder's tasks.
:param kwargs: Keyword arguments to pass to `run_plugin_for_all_repos`.
>>> from gitmate_hooks.utils import ResponderRegistrar
>>> @ResponderRegistrar.scheduled_responder('test', 10.0)
... def test_responder(igitt_repo):
... print('Hello, World!')
This will register a `test.test_responder` responder and schedule
`run_plugin_for_all_repos` with arguments `('test',
'test.test_responder')` with 10 seconds interval.
"""
def _wrapper(function: Callable):
action = '{}.{}'.format(plugin, function.__name__)
periodic_task_args = (plugin, action)
function = cls.responder(plugin, action)(function)
task = celery.task(run_plugin_for_all_repos,
base=ExceptionLoggerTask,
queue=queue.value)
celery.add_periodic_task(
interval, task.s(), periodic_task_args, kwargs)
return function
return _wrapper
def schedule(self):
return schedules.crontab(minute=self.minute,
hour=self.hour,
day_of_week=self.day_of_week,
day_of_month=self.day_of_month,
month_of_year=self.month_of_year)
def __str__(self):
fmt = '{0.name}: {0.crontab}'
return fmt.format(self)
def schedule(self):
if self.crontab:
return self.crontab.schedule
if self.interval:
return self.interval.schedule
def install_default_entries(self, data):
entries = {}
if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
entries.setdefault(
'celery.backend_cleanup', {
'task': 'celery.backend_cleanup',
'schedule': schedules.crontab('*/5', '*', '*'),
'options': {'expires': 12 * 3600},
},
)
self.update_from_dict(entries)
def install_default_entries(self, data):
entries = {}
if self.app.conf.result_expires:
entries.setdefault(
'celery.backend_cleanup', {
'task': 'celery.backend_cleanup',
'schedule': schedules.crontab('0', '4', '*'),
'options': {'expires': 12 * 3600},
},
)
self.update_from_dict(entries)
def schedule(self):
return schedules.crontab(minute=self.minute,
hour=self.hour,
day_of_week=self.day_of_week,
day_of_month=self.day_of_month,
month_of_year=self.month_of_year,
nowfun=lambda: make_aware(now()))
def validate_unique(self, *args, **kwargs):
super(PeriodicTask, self).validate_unique(*args, **kwargs)
if not self.interval and not self.crontab and not self.solar:
raise ValidationError({
'interval': [
'One of interval, crontab, or solar must be set.'
]
})
if self.interval and self.crontab and self.solar:
raise ValidationError({
'crontab': [
'Only one of interval, crontab, or solar must be set'
]
})
def __str__(self):
fmt = '{0.name}: {{no schedule}}'
if self.interval:
fmt = '{0.name}: {0.interval}'
if self.crontab:
fmt = '{0.name}: {0.crontab}'
if self.solar:
fmt = '{0.name}: {0.solar}'
return fmt.format(self)
def schedule(self):
if self.interval:
return self.interval.schedule
if self.crontab:
return self.crontab.schedule
if self.solar:
return self.solar.schedule
def test_worker_schedule():
# timedelta
conf = WorkerConfiguration({
'worker': {
'broker_url': 'redis://',
'celery_result_backend': 'redis://',
'celerybeat_schedule': {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 'timedelta(seconds=30)',
},
},
}
})
assert conf.worker_schedule == {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': datetime.timedelta(seconds=30),
'args': (),
},
}
assert conf.worker_config['CELERYBEAT_SCHEDULE'] == conf.worker_schedule
# crontab
conf2 = WorkerConfiguration({
'worker': {
'broker_url': 'redis://',
'celery_result_backend': 'redis://',
'celerybeat_schedule': {
'add-every-minute': {
'task': 'tasks.add',
'schedule': "crontab(minute='*')",
'args': [16, 16],
},
},
}
})
assert conf2.worker_schedule == {
'add-every-minute': {
'task': 'tasks.add',
'schedule': crontab(minute='*'),
'args': (16, 16),
},
}
assert conf2.worker_config['CELERYBEAT_SCHEDULE'] == conf2.worker_schedule
# import path
conf3 = WorkerConfiguration({
'worker': {
'broker_url': 'redis://',
'celery_result_backend': 'redis://',
'celerybeat_schedule': {
'add-every-minute': {
'task': 'tasks.add',
'schedule': "celery.schedules:crontab(minute='*')",
'args': [16, 16],
},
},
}
})
assert conf3.worker_schedule == conf2.worker_schedule
assert conf3.worker_config['CELERYBEAT_SCHEDULE'] == conf3.worker_schedule
def encode_schedule(value):
if value is None:
return None
elif isinstance(value, datetime):
return {
'__type__': 'datetime',
'__value__': encode_datetime(value)
}
elif isinstance(value, crontab):
return {
'__type__': 'crontab',
'__value__': '%(minute)s\t%(hour)s\t%(day_of_week)s\t'
'%(day_of_month)s\t%(month_of_year)s' % {
'minute': value._orig_minute,
'hour': value._orig_hour,
'day_of_week': value._orig_day_of_week,
'day_of_month': value._orig_day_of_month,
'month_of_year': value._orig_month_of_year,
}
}
elif isinstance(value, solar):
return {
'__type__': 'solar',
'__value__': {
'event': value.event,
'lat': value.lat,
'lon': value.lon
}
}
elif isinstance(value, schedule):
return {
'__type__': 'schedule',
'__value__': {
'run_every': value.run_every.total_seconds(),
'relative': bool(value.relative),
}
}
else:
raise NotImplementedError(
'Cannot serialize schedule %(type)s type' % {
'type': type(value).__name__
}
)