def enqueue_mwoffliner():
token = UserJWT.from_request_header(request)
# only admins can enqueue task
if not token.is_admin:
raise exception.NotEnoughPrivilege()
def check_task(config: dict):
# check config is a dict
if not isinstance(config, dict):
raise exception.InvalidRequest()
# check config has mandatory data
if config.get('mwUrl') is None or config.get('adminEmail') is None:
raise exception.InvalidRequest()
def enqueue_task(config: dict):
task_name = 'mwoffliner'
celery_task = celery.send_task(task_name, kwargs={
'token': TaskJWT.new(task_name),
'config': config
})
TasksCollection().insert_one({
'_id': celery_task.id,
'celery_task_name': task_name,
'status': 'PENDING',
'time_stamp': {'created': datetime.utcnow(), 'started': None, 'ended': None},
'options': config,
'steps': []
})
task_configs = request.get_json()
if not isinstance(task_configs, list):
raise exception.InvalidRequest()
for task_config in task_configs:
check_task(task_config)
for task_config in task_configs:
enqueue_task(task_config)
return Response(status=202)
评论列表
文章目录