def init_databases(self):
"""Initialize database connection pools from settings.py,
setting up Sanic blueprints for server start and stop."""
for database in settings.DATABASES:
db_blueprint = Blueprint(f'{self.name}_db_blueprint_{database}')
connection_settings = settings.DATABASES[database].copy()
if not connection_settings['user']:
connection_settings.pop('user')
connection_settings.pop('password')
@db_blueprint.listener('before_server_start')
async def setup_connection_pool(app, loop):
self._db_pools[database] = await create_pool(**connection_settings)
@db_blueprint.listener('after_server_stop')
async def close_connection_pool(app, loop):
if database in self._db_pools and self._db_pools[database]:
await self._db_pools[database].close()
self.server.blueprint(db_blueprint)
评论列表
文章目录