def make_celery(app):
kombu.serialization.register(
'drift_celery_json',
drift_celery_dumps, drift_celery_loads,
content_type='application/x-myjson', content_encoding='utf-8'
)
celery = Celery(app.import_name)
# if BROKER_URL is not set use the redis server
BROKER_URL = app.config.get("BROKER_URL", None)
if not BROKER_URL:
BROKER_URL = "redis://{}:6379/{}".format(app.config.get("redis_server"), CELERY_DB_NUMBER)
log.info("Using redis for celery broker: %s", BROKER_URL)
else:
log.info("celery broker set in config: %s", BROKER_URL)
celery.conf.update(app.config)
celery.conf["BROKER_URL"] = BROKER_URL
celery.conf["CELERY_RESULT_BACKEND"] = BROKER_URL
celery.conf["CELERY_TASK_SERIALIZER"] = "drift_celery_json"
celery.conf["CELERY_RESULT_SERIALIZER"] = "drift_celery_json"
celery.conf["CELERY_ACCEPT_CONTENT"] = ["drift_celery_json"]
celery.conf["CELERY_ENABLE_UTC"] = True
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
# custom json encoder for datetime object serialization
# from http://stackoverflow.com/questions/21631878/celery-is-there-a-way-to-write-custom-json-encoder-decoder
评论列表
文章目录