test_celery.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:pylogctx 作者: peopledoc 项目源码 文件源码
def test_adapter():
    from pylogctx import context, log_adapter

    app = Celery(task_cls='pylogctx.celery.LoggingTask')

    @log_adapter(app.Task)
    def adapter(task):
        return {
            'celeryTaskId': task.request.id,
            'celeryTask': task.name
        }

    @app.task
    def my_task():
        return context.as_dict()

    result = my_task.apply()
    if VERSION.major < 4:
        result.maybe_reraise()
    else:
        result.maybe_throw()

    fields = result.result
    assert 'celeryTask' in fields
    assert 'celeryTaskId' in fields
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号