zmirror.py 文件源码

python
阅读 35 收藏 0 点赞 0 评论 0

项目:zmirror 作者: aploium 项目源码 文件源码
def cron_task_container(task_dict, add_task_only=False):
    """
    ??????. ??????, ??????????????

    :param task_dict: ?????????, dict
      { "target":????(????????,????????) ??,
        "iterval":????(?) ??,
        "priority":??? ??,
        "name":?????? ??
        "args":????? (arg1,arg2) ??,
        "kwargs":????? {key:value,} ??,
      }
    :param add_task_only: ?????????????
    """
    global task_scheduler
    if not add_task_only:
        # ????
        try:
            infoprint('CronTask:', task_dict.get('name', str(task_dict['target'])), 'Target:', str(task_dict['target']))

            target_func = task_dict.get('target')
            if target_func is None:
                raise ValueError("target is not given in " + str(task_dict))
            target_func(
                *(task_dict.get('args', ())),  # ????????
                **(task_dict.get('kwargs', {}))
            )
        except:  # coverage: exclude
            errprint('ErrorWhenProcessingCronTasks', task_dict)
            traceback.print_exc()

    # ????????, ??????
    if not enable_cron_tasks:
        if threading.current_thread() != threading.main_thread():
            exit()
        else:
            return

    # ?????????
    task_scheduler.enter(
        task_dict.get('interval', 300),
        task_dict.get('priority', 999),
        cron_task_container,
        (task_dict,)
    )
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号