celery.py 文件源码

python
阅读 26 收藏 0 点赞 0 评论 0

项目:websauna 作者: websauna 项目源码 文件源码
def parse_celery_config(celery_config_python: str) -> dict:
    # Expose timedelta object for config to be used in beat schedule
    # http://docs.celeryproject.org/en/master/userguide/periodic-tasks.html#beat-entries
    from datetime import timedelta  # noqa
    from celery.schedules import crontab  # noqa

    _globals = globals().copy()
    _locals = locals().copy()
    code = textwrap.dedent(celery_config_python)

    try:
        config_dict = eval(code, _globals, _locals)
    except Exception as e:
        raise RuntimeError("Could not execute Python code to produce Celery configuration object: {}".format(code)) from e

    if "broker_url" not in config_dict:
        raise RuntimeError("Mandatory broker_url Celery setting missing. Did we fail to parse config? {}".format(config_dict))

    return config_dict
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号