def get_url(self):
dbconf=self.dbconf
try:
dialect=get_item(dbconf, 'dialect', )
except Exception as e:
raise Exception("dialect key missing in configuration for: %s" % (self.database)) from e
func=dialect_map.get(dialect)
if func:
#except Exception as e:
#raise Exception('unhandled dialect: %s, allow %s.' %(dialect, repr(list(dialect_map.keys())))) from e
logger.debug('specialize map: %s' % repr(func))
url, kwargs=func(dbconf, self.database)
else:
url_args=get_url_kwargs(dbconf, )
try:
url=URL(drivername=dialect, **url_args)
# fetch other connection args (called query in sqlalchemy.engine.url.URL
except Exception as e:
raise Exception('Failed to create URL for : %s:%s' % (dialect, url_args.get('drivername'))) from e
kwargs=get_rest_kwargs(dbconf, ignore=CONNECT_ARGS, )
return url, kwargs
评论列表
文章目录