def get_engine(self):
with self._lock:
uri = self.get_uri()
echo = self._app.config['SQLALCHEMY_ECHO']
if (uri, echo) == self._connected_for:
return self._engine
info = make_url(uri)
options = {'convert_unicode': True}
self._sa.apply_pool_defaults(self._app, options)
self._sa.apply_driver_hacks(self._app, info, options)
if echo:
options['echo'] = True
self._engine = rv = sqlalchemy.create_engine(info, **options)
if _record_queries(self._app):
_EngineDebuggingSignalEvents(self._engine,
self._app.import_name).register()
self._connected_for = (uri, echo)
return rv
评论列表
文章目录