task.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:codex-backend 作者: codexgigassys 项目源码 文件源码
def add_task(requested):
    task_id = id_generator(40)
    if requested.get('document_name') is None:
        requested["document_name"] = ""

    response = {"requested": requested,
                "date_enqueued": datetime.datetime.now(),
                "task_id": task_id}
    save(response)
    if requested.get('vt_samples'):
        queue_name = "task_private_vt"  # task needs a private VT api
    elif requested.get('vt_av') and not requested.get('vt_samples'):
        queue_name = "task_public_vt"  # task needs a public VT api
    else:
        queue_name = "task_no_vt"  # task doesn't need VT
    q = Queue(queue_name, connection=Redis(host=envget('redis.host')))
    job = q.enqueue('Api.task.generic_task', args=(task_id,), timeout=31536000)
    return task_id
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号