def __init__(self, model, app=None):
super().__init__(
name=model.name,
task=model.task,
last_run_at=model.meta.last_run_at,
total_run_count=model.meta.total_run_count,
schedule=model.schedule,
args=model.args,
kwargs=model.kwargs,
options={
'queue': model.queue,
'exchange': model.exchange,
'routing_key': model.routing_key,
'expires': model.expires_at
},
app=app or celery_app._get_current_object()
)
self.model = model
python类schedule()的实例源码
def _unpack_entry_fields(cls, schedule, args=None, kwargs=None,
relative=None, options=None, **entry):
def _unpack_entry_options(queue=None, exchange=None, routing_key=None,
**kwargs):
return {
'queue': queue,
'exchange': exchange,
'routing_key': routing_key
}
model_schedule, model_field = cls.to_model_schedule(schedule)
entry.update(
{
model_field: model_schedule
},
args=args or [],
kwargs=kwargs or {},
**_unpack_entry_options(**options or {})
)
return entry
def delete(self):
"""
If found, delete the entry from the scheduler
"""
entry = self.get()
if entry is None:
return False
else:
logger.info(
'Deleting running %s from schedule. '
'Interval: %s. '
'Starts at: %s.' %
(self.name, self.run_every, self.run_at)
)
entry.delete()
return True
def decode_schedule(obj):
if obj is None:
return None
_type = obj['__type__']
value = obj['__value__']
if _type == 'datetime':
return decode_datetime(value)
elif _type == 'crontab':
return crontab(*value.split('\t'))
elif _type == 'solar':
return solar(**value)
elif _type == 'schedule':
return schedule(**value)
else:
raise NotImplementedError(
'Cannot deserialize schedule %(type)s type' % {
'type': _type
}
)
def from_entry(cls, name, session, skip_fields=('relative', 'options'), **entry):
"""
??????PeriodicTask
:param session:
:param name:
:param skip_fields:
:param entry:
:return:
"""
fields = dict(entry)
for skip_field in skip_fields:
fields.pop(skip_field, None)
schedule = fields.pop('schedule')
model_schedule, model_field = cls.to_model_schedule(schedule, session)
fields[model_field] = model_schedule
fields['args'] = json.dumps(fields.get('args') or [])
fields['kwargs'] = json.dumps(fields.get('kwargs') or {})
model, _ = PeriodicTask.update_or_create(session, name=name, defaults=fields)
cls.save_model(session, model)
return cls(model)
def test_schedule_changed(self):
self.m2.args = '[16, 16]'
self.m2.save()
e2 = self.s.schedule[self.m2.name]
assert e2.args == [16, 16]
self.m1.args = '[32, 32]'
self.m1.save()
e1 = self.s.schedule[self.m1.name]
assert e1.args == [32, 32]
e1 = self.s.schedule[self.m1.name]
assert e1.args == [32, 32]
self.m3.delete()
with pytest.raises(KeyError):
self.s.schedule.__getitem__(self.m3.name)
def test_sync_syncs_before_save(self):
# Get the entry for m2
e1 = self.s.schedule[self.m2.name]
# Increment the entry (but make sure it doesn't sync)
self.s._last_sync = monotonic()
e2 = self.s.schedule[e1.name] = self.s.reserve(e1)
assert self.s.flushed == 1
# Fetch the raw object from db, change the args
# and save the changes.
m2 = PeriodicTask.objects.get(pk=self.m2.pk)
m2.args = '[16, 16]'
m2.save()
# get_schedule should now see the schedule has changed.
# and also sync the dirty objects.
e3 = self.s.schedule[self.m2.name]
assert self.s.flushed == 2
assert e3.last_run_at == e2.last_run_at
assert e3.args == [16, 16]
def test_periodic_task_disabled_while_reserved(self):
# Get the entry for m2
e1 = self.s.schedule[self.m2.name]
# Increment the entry (but make sure it doesn't sync)
self.s._last_sync = monotonic()
e2 = self.s.schedule[e1.name] = self.s.reserve(e1)
assert self.s.flushed == 1
# Fetch the raw object from db, change the args
# and save the changes.
m2 = PeriodicTask.objects.get(pk=self.m2.pk)
m2.enabled = False
m2.save()
# reserve is called because the task gets called from
# tick after the database change is made
self.s.reserve(e2)
# get_schedule should now see the schedule has changed.
# and remove entry for m2
assert self.m2.name not in self.s.schedule
assert self.s.flushed == 2
def test_SolarSchedule_schedule(self):
s = SolarSchedule(event='solar_noon', latitude=48.06, longitude=12.86)
dt = datetime(day=26, month=7, year=2050, hour=1, minute=0)
dt_lastrun = make_aware(dt)
assert s.schedule is not None
isdue, nextcheck = s.schedule.is_due(dt_lastrun)
assert isdue is False # False means task isn't due, but keep checking.
assert (nextcheck > 0) and (isdue is False) or \
(nextcheck == s.max_interval) and (isdue is True)
s2 = SolarSchedule(event='solar_noon', latitude=48.06, longitude=12.86)
dt2 = datetime(day=26, month=7, year=2000, hour=1, minute=0)
dt2_lastrun = make_aware(dt2)
assert s2.schedule is not None
isdue2, nextcheck2 = s2.schedule.is_due(dt2_lastrun)
assert isdue2 is True # True means task is due and should run.
assert (nextcheck2 > 0) and (isdue2 is True) or \
(nextcheck2 == s2.max_interval) and (isdue2 is False)
def schedule(self):
update = False
if not self._initial_read:
debug('DatabaseScheduler: initial read')
update = True
self._initial_read = True
elif self.schedule_changed():
info('DatabaseScheduler: Schedule changed.')
update = True
if update:
self.sync()
self._schedule = self.all_as_schedule()
# the schedule changed, invalidate the heap in Scheduler.tick
self._heap = None
if logger.isEnabledFor(logging.DEBUG):
debug('Current schedule:\n%s', '\n'.join(
repr(entry) for entry in values(self._schedule)),
)
return self._schedule
def to_model_schedule(cls, schedule):
for schedule_type, model_type, model_field in cls.model_schedules:
schedule = schedules.maybe_schedule(schedule)
if isinstance(schedule, schedule_type):
model_schedule = model_type.from_schedule(schedule)
return model_schedule, model_field
raise ValueError(
'Cannot convert schedule type {0!r} to model'.format(schedule)
)
def setup_schedule(self):
self.install_default_entries(self.schedule)
self.update_from_dict(self.app.conf.CELERYBEAT_SCHEDULE)
def install_default_entries(self, data):
entries = {}
if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
# Add backend clean up
entries.setdefault(
'celery.backend_cleanup', {
'task': 'celery.backend_cleanup',
'schedule': schedules.crontab('0', '4', '*')
}
)
self.update_from_dict(entries)
def schedule(self):
update = False
if not self._initial_read:
logger.info('DatabaseScheduler: initial read')
update = True
self._initial_read = True
elif self.is_schedule_changed:
logger.info('DatabaseScheduler: schedule changed')
update = True
if update:
self._schedule = self.all_as_schedule()
return self._schedule
def schedule(self):
return schedules.crontab(
minute=self.minute,
hour=self.hour,
day_of_week=self.day_of_week,
day_of_month=self.day_of_month,
month_of_year=self.month_of_year
)
def from_schedule(cls, schedule):
data = dict([
(x, getattr(schedule, '_orig_{0}'.format(x)))
for x in (
'minute', 'hour', 'day_of_week', 'day_of_month', 'month_of_year'
)
])
instance = cls.query.filter(*[
getattr(cls, k) == v for k, v in data.items()
]).first()
if not instance:
instance = cls(**data)
db.session.add(instance)
db.session.commit()
return instance
def schedule(self):
return schedules.schedule(timedelta(**{self.period: self.every}))
def from_schedule(cls, schedule):
every = max(schedule.run_every.total_seconds(), 0)
instance = cls.query.filter_by(every=every, period='seconds')
if not instance:
instance = cls(every=every, period='seconds')
db.session.add(instance)
db.session.commit()
return instance
def create(self):
entry = RedBeatSchedulerEntry(
name=self.name,
task=self.task,
schedule=schedule(
run_every=self.run_every,
# setting "now" to the job start datetime
nowfun=lambda: self.run_at,
app=celery,
),
args=(self.spark_job.pk,),
kwargs={},
app=celery,
)
return entry
def add(self):
"""
Create and save an entry to the scheduler
"""
logger.info(
'Adding running %s to schedule. '
'Interval: %s. '
'Starts at: %s.' %
(self.name, self.run_every, self.run_at)
)
entry = self.create()
entry.save()
return entry
def serialize_entry(entry, schedule_encoder=encode_schedule):
return {
'name': entry.name,
'task': entry.task,
'schedule': schedule_encoder(entry.schedule),
'args': entry.args,
'kwargs': entry.kwargs,
'last_run_at': encode_datetime(entry.last_run_at, allow_null=True),
'total_run_count': entry.total_run_count,
'options': entry.options
}
def deserialize_entry(entry, schedule_decoder=decode_schedule):
return ScheduleEntry(
name=entry['name'],
task=entry['task'],
schedule=schedule_decoder(entry['schedule']),
args=entry['args'],
kwargs=entry['kwargs'],
last_run_at=decode_datetime(entry['last_run_at'], allow_null=True),
total_run_count=entry['total_run_count'],
options=entry['options'],
)
def schedule(self):
return schedules.schedule(datetime.timedelta(**{self.period.code: self.every}))
def from_schedule(cls, session, schedule, period='seconds'):
every = max(schedule.run_every.total_seconds(), 0)
obj = cls.filter_by(session, every=every, period=period).first()
if obj is None:
return cls(every=every, period=period)
else:
return obj
def schedule(self):
return schedules.crontab(minute=self.minute,
hour=self.hour,
day_of_week=self.day_of_week,
day_of_month=self.day_of_month,
month_of_year=self.month_of_year)
def from_schedule(cls, session, schedule):
spec = {'minute': schedule._orig_minute,
'hour': schedule._orig_hour,
'day_of_week': schedule._orig_day_of_week,
'day_of_month': schedule._orig_day_of_month,
'month_of_year': schedule._orig_month_of_year}
obj = cls.filter_by(session, **spec).first()
if obj is None:
return cls(**spec)
else:
return obj
def schedule(self):
if self.crontab:
return self.crontab.schedule
if self.interval:
return self.interval.schedule
def is_due(self):
if not self.model.enabled:
return False, 5.0 # 5 second delay for re-enable.
return self.schedule.is_due(self.last_run_at)
def to_model_schedule(cls, schedule, session):
"""
:param session:
:param schedule:
:return:
"""
for schedule_type, model_type, model_field in cls.model_schedules:
debug(cls.model_schedules)
schedule = schedules.maybe_schedule(schedule)
if isinstance(schedule, schedule_type):
model_schedule = model_type.from_schedule(session, schedule)
cls.save_model(session, model_schedule)
return model_schedule, model_field
raise ValueError('Cannot convert schedule type {0!r} to model'.format(schedule))
def __repr__(self):
return '<ModelEntry: {0} {1}(*{2}, **{3}) {{4}}>'.format(
safe_str(self.name), self.task, self.args, self.kwargs, self.schedule,
)