def schedule_handler(cls: Any, obj: Any, context: Dict, func: Any, interval: Optional[Union[str, int]]=None, timestamp: Optional[str]=None, timezone: Optional[str]=None) -> Any:
async def handler() -> None:
values = inspect.getfullargspec(func)
kwargs = {k: values.defaults[i] for i, k in enumerate(values.args[len(values.args) - len(values.defaults):])} if values.defaults else {}
routine = func(*(obj,), **kwargs)
try:
if isinstance(routine, Awaitable):
await routine
except Exception as e:
pass
context['_schedule_scheduled_functions'] = context.get('_schedule_scheduled_functions', [])
context['_schedule_scheduled_functions'].append((interval, timestamp, timezone, func, handler))
start_func = cls.start_scheduler(cls, obj, context)
return (await start_func) if start_func else None
评论列表
文章目录