def parse_celery_config(celery_config_python: str) -> dict:
# Expose timedelta object for config to be used in beat schedule
# http://docs.celeryproject.org/en/master/userguide/periodic-tasks.html#beat-entries
from datetime import timedelta # noqa
from celery.schedules import crontab # noqa
_globals = globals().copy()
_locals = locals().copy()
code = textwrap.dedent(celery_config_python)
try:
config_dict = eval(code, _globals, _locals)
except Exception as e:
raise RuntimeError("Could not execute Python code to produce Celery configuration object: {}".format(code)) from e
if "broker_url" not in config_dict:
raise RuntimeError("Mandatory broker_url Celery setting missing. Did we fail to parse config? {}".format(config_dict))
return config_dict
评论列表
文章目录