def setup_schedule(self):
self.install_default_entries(self.schedule)
self.update_from_dict(self.app.conf.CELERYBEAT_SCHEDULE)
python类schedule()的实例源码
def sync(self):
info('Writing entries...')
_tried = set()
while self._dirty:
try:
name = self._dirty.pop()
_tried.add(name)
self.schedule[name].save()
except KeyError:
pass
def update_from_dict(self, dict_):
s = {}
for name, entry in dict_.items():
try:
s[name] = self.Entry.from_entry(name, self.session, **entry)
except Exception as exc:
error(ADD_ENTRY_ERROR, name, exc, entry)
self.schedule.update(s)
def install_default_entries(self, data):
entries = {}
if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
entries.setdefault(
'celery.backend_cleanup', {
'task': 'celery.backend_cleanup',
'schedule': schedules.crontab('*/5', '*', '*'),
'options': {'expires': 12 * 3600},
},
)
self.update_from_dict(entries)
def schedule(self):
update = False
if not self._initial_read:
debug('DatabaseScheduler: intial read')
update = True
self._initial_read = True
elif self.schedule_changed():
info('DatabaseScheduler: Schedule changed.')
update = True
if update:
self.sync()
self._schedule = self.all_as_schedule()
debug('Current schedule:\n%s', '\n'.join(repr(entry) for entry in self._schedule.itervalues()))
return self._schedule
def create_model_interval(self, schedule, **kwargs):
interval = IntervalSchedule.from_schedule(schedule)
interval.save()
return self.create_model(interval=interval, **kwargs)
def create_model_crontab(self, schedule, **kwargs):
crontab = CrontabSchedule.from_schedule(schedule)
crontab.save()
return self.create_model(crontab=crontab, **kwargs)
def create_model_solar(self, schedule, **kwargs):
solar = SolarSchedule.from_schedule(schedule)
solar.save()
return self.create_model(solar=solar, **kwargs)
def create_conf_entry(self):
name = 'thefoo{0}'.format(next(_ids))
return name, dict(
task='djcelery.unittest.add{0}'.format(next(_ids)),
schedule=timedelta(0, 600),
args=(),
relative=False,
kwargs={},
options={'queue': 'extra_queue'}
)
def test_entry(self):
m = self.create_model_interval(schedule(timedelta(seconds=10)))
e = self.Entry(m, app=self.app)
assert e.args == [2, 2]
assert e.kwargs == {'callback': 'foo'}
assert e.schedule
assert e.total_run_count == 0
assert isinstance(e.last_run_at, datetime)
assert e.options['queue'] == 'xaz'
assert e.options['exchange'] == 'foo'
assert e.options['routing_key'] == 'cpu'
right_now = self.app.now()
m2 = self.create_model_interval(
schedule(timedelta(seconds=10)),
last_run_at=right_now,
)
assert m2.last_run_at
e2 = self.Entry(m2, app=self.app)
assert e2.last_run_at is right_now
e3 = e2.next()
assert e3.last_run_at > e2.last_run_at
assert e3.total_run_count == 1
def test_periodic_task_model_disabled_schedule(self):
self.m1.enabled = False
self.m1.save()
s = self.Scheduler(app=self.app)
sched = s.schedule
assert sched
assert len(sched) == 1
assert 'celery.backend_cleanup' in sched
assert self.entry_name not in sched
def setup_scheduler(self, app):
self.app = app
self.app.conf.beat_schedule = {}
self.m1 = self.create_model_interval(
schedule(timedelta(seconds=10)))
self.m1.save()
self.m1.refresh_from_db()
self.m2 = self.create_model_interval(
schedule(timedelta(minutes=20)))
self.m2.save()
self.m2.refresh_from_db()
self.m3 = self.create_model_crontab(
crontab(minute='2,4,5'))
self.m3.save()
self.m3.refresh_from_db()
self.m4 = self.create_model_solar(
solar('solar_noon', 48.06, 12.86))
self.m4.save()
self.m4.refresh_from_db()
# disabled, should not be in schedule
m5 = self.create_model_interval(
schedule(timedelta(seconds=1)))
m5.enabled = False
m5.save()
self.s = self.Scheduler(app=self.app)
def test_all_as_schedule(self):
sched = self.s.schedule
assert sched
assert len(sched) == 5
assert 'celery.backend_cleanup' in sched
for n, e in sched.items():
assert isinstance(e, self.s.Entry)
def test_reserve(self):
e1 = self.s.schedule[self.m1.name]
self.s.schedule[self.m1.name] = self.s.reserve(e1)
assert self.s.flushed == 1
e2 = self.s.schedule[self.m2.name]
self.s.schedule[self.m2.name] = self.s.reserve(e2)
assert self.s.flushed == 1
assert self.m2.name in self.s._dirty
def test_periodic_task_disabled_and_enabled(self):
# Get the entry for m2
e1 = self.s.schedule[self.m2.name]
# Increment the entry (but make sure it doesn't sync)
self.s._last_sync = monotonic()
self.s.schedule[e1.name] = self.s.reserve(e1)
assert self.s.flushed == 1
# Fetch the raw object from db, change the args
# and save the changes.
m2 = PeriodicTask.objects.get(pk=self.m2.pk)
m2.enabled = False
m2.save()
# get_schedule should now see the schedule has changed.
# and remove entry for m2
assert self.m2.name not in self.s.schedule
assert self.s.flushed == 2
m2.enabled = True
m2.save()
# get_schedule should now see the schedule has changed.
# and add entry for m2
assert self.m2.name in self.s.schedule
assert self.s.flushed == 3
def test_sync_rollback_on_save_error(self):
self.s.schedule[self.m1.name] = EntrySaveRaises(self.m1, app=self.app)
self.s._dirty.add(self.m1.name)
with pytest.raises(RuntimeError):
self.s.sync()
def test_PeriodicTask_unicode_interval(self):
p = self.create_model_interval(schedule(timedelta(seconds=10)))
assert text_t(p) == '{0}: every 10.0 seconds'.format(p.name)
def test_PeriodicTask_unicode_no_schedule(self):
p = self.create_model()
assert text_t(p) == '{0}: {{no schedule}}'.format(p.name)
def test_CrontabSchedule_schedule(self):
s = CrontabSchedule(
minute='3, 7',
hour='3, 4',
day_of_week='*',
day_of_month='1, 16',
month_of_year='1, 7',
)
assert s.schedule.minute == {3, 7}
assert s.schedule.hour == {3, 4}
assert s.schedule.day_of_week == {0, 1, 2, 3, 4, 5, 6}
assert s.schedule.day_of_month == {1, 16}
assert s.schedule.month_of_year == {1, 7}
def test_track_changes(self):
assert PeriodicTasks.last_change() is None
m1 = self.create_model_interval(schedule(timedelta(seconds=10)))
m1.save()
x = PeriodicTasks.last_change()
assert x
m1.args = '(23, 24)'
m1.save()
y = PeriodicTasks.last_change()
assert y
assert y > x