def get_celery(registry: Registry):
"""Load and configure Celery app.
Cache the loaded Celery app object on registry.
:param registry: Use registry settings to load Celery
"""
celery = getattr(registry, "celery", None)
if not celery:
celery = registry.celery = Celery()
celery.conf.update(get_celery_config(registry))
# Expose Pyramid registry to Celery app and tasks
celery.registry = registry
return celery
评论列表
文章目录