fixtures.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:apm-agent-python 作者: elastic 项目源码 文件源码
def flask_celery(flask_apm_client):
    from celery import Celery

    flask_app = flask_apm_client.app
    celery = Celery(flask_app.import_name, backend=None,
                    broker=None)
    celery.conf.update(CELERY_ALWAYS_EAGER=True)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with flask_app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    celery.flask_apm_client = flask_apm_client
    return celery
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号