task.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:zimfarm 作者: openzim 项目源码 文件源码
def enqueue_mwoffliner():
    token = UserJWT.from_request_header(request)

    # only admins can enqueue task
    if not token.is_admin:
        raise exception.NotEnoughPrivilege()

    def check_task(config: dict):
        # check config is a dict
        if not isinstance(config, dict):
            raise exception.InvalidRequest()

        # check config has mandatory data
        if config.get('mwUrl') is None or config.get('adminEmail') is None:
            raise exception.InvalidRequest()

    def enqueue_task(config: dict):
        task_name = 'mwoffliner'

        celery_task = celery.send_task(task_name, kwargs={
            'token': TaskJWT.new(task_name),
            'config': config
        })

        TasksCollection().insert_one({
            '_id': celery_task.id,
            'celery_task_name': task_name,
            'status': 'PENDING',
            'time_stamp': {'created': datetime.utcnow(), 'started': None, 'ended': None},
            'options': config,
            'steps': []
        })

    task_configs = request.get_json()
    if not isinstance(task_configs, list):
        raise exception.InvalidRequest()
    for task_config in task_configs:
        check_task(task_config)
    for task_config in task_configs:
        enqueue_task(task_config)
    return Response(status=202)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号