queue.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:makiki 作者: faith0811 项目源码 文件源码
def register_to_celery(celery_broker, celery_config, async_task, max_retries=12, DBSession=None):
    def send_after_commit_tasks(session):
        if not hasattr(async_ctx, 'reged_tasks'):
            return
        for task in async_ctx.reged_tasks:
            task.send(async_api)
        delattr(async_ctx, 'reged_tasks')

    broker = 'amqp://{user}:{password}@{host}:{port}/{vhost}'.\
        format(**celery_broker)

    app = Celery(broker=broker)
    app.conf.update(**celery_config)

    async_api = app.task(max_retries=max_retries, bind=True)(async_task)
    signals.setup_logging.connect(init_celery_log)
    if DBSession:
        if event:
            event.listens_for(DBSession, 'after_commit')(send_after_commit_tasks)
        else:
            raise ImportError('You must install sqlalchemy first.')

    return app, async_api
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号