def from_entry(cls, name, session, skip_fields=('relative', 'options'), **entry):
"""
??????PeriodicTask
:param session:
:param name:
:param skip_fields:
:param entry:
:return:
"""
fields = dict(entry)
for skip_field in skip_fields:
fields.pop(skip_field, None)
schedule = fields.pop('schedule')
model_schedule, model_field = cls.to_model_schedule(schedule, session)
fields[model_field] = model_schedule
fields['args'] = json.dumps(fields.get('args') or [])
fields['kwargs'] = json.dumps(fields.get('kwargs') or {})
model, _ = PeriodicTask.update_or_create(session, name=name, defaults=fields)
cls.save_model(session, model)
return cls(model)
评论列表
文章目录