def register_to_celery(celery_broker, celery_config, async_task, max_retries=12, DBSession=None):
def send_after_commit_tasks(session):
if not hasattr(async_ctx, 'reged_tasks'):
return
for task in async_ctx.reged_tasks:
task.send(async_api)
delattr(async_ctx, 'reged_tasks')
broker = 'amqp://{user}:{password}@{host}:{port}/{vhost}'.\
format(**celery_broker)
app = Celery(broker=broker)
app.conf.update(**celery_config)
async_api = app.task(max_retries=max_retries, bind=True)(async_task)
signals.setup_logging.connect(init_celery_log)
if DBSession:
if event:
event.listens_for(DBSession, 'after_commit')(send_after_commit_tasks)
else:
raise ImportError('You must install sqlalchemy first.')
return app, async_api
评论列表
文章目录