schedule.py 文件源码

python
阅读 74 收藏 0 点赞 0 评论 0

项目:tomodachi 作者: kalaspuff 项目源码 文件源码
def schedule_handler(cls: Any, obj: Any, context: Dict, func: Any, interval: Optional[Union[str, int]]=None, timestamp: Optional[str]=None, timezone: Optional[str]=None) -> Any:
        async def handler() -> None:
            values = inspect.getfullargspec(func)
            kwargs = {k: values.defaults[i] for i, k in enumerate(values.args[len(values.args) - len(values.defaults):])} if values.defaults else {}
            routine = func(*(obj,), **kwargs)
            try:
                if isinstance(routine, Awaitable):
                    await routine
            except Exception as e:
                pass

        context['_schedule_scheduled_functions'] = context.get('_schedule_scheduled_functions', [])
        context['_schedule_scheduled_functions'].append((interval, timestamp, timezone, func, handler))

        start_func = cls.start_scheduler(cls, obj, context)
        return (await start_func) if start_func else None
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号