test_celery.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:pylogctx 作者: peopledoc 项目源码 文件源码
def test_task():
    from pylogctx import context

    app = Celery(task_cls=LoggingTask)

    @app.task
    def my_task():
        context.update(taskField='RUNNED')
        logger = get_task_logger(current_task.name)
        logger.info("I log!")
        return context.as_dict()

    result = my_task.apply()
    if VERSION.major < 4:
        result.maybe_reraise()
    else:
        result.maybe_throw()
    fields = result.result
    assert 'taskField' in fields
    assert not context.as_dict()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号