def _connect(self):
if self.application_name is None:
st = inspect.stack()
self.application_name = os.path.basename(st[-1][1])
conn_args = dict(
host=self.url.hostname,
port=self.url.port,
database=self.url.database,
user=self.url.username,
password=self.url.password,
application_name=self.application_name + "/" + hgvs.__version__, )
if self.pooling:
_logger.info("Using UTA ThreadedConnectionPool")
self._pool = psycopg2.pool.ThreadedConnectionPool(hgvs.global_config.uta.pool_min,
hgvs.global_config.uta.pool_max, **conn_args)
else:
self._conn = psycopg2.connect(**conn_args)
self._conn.autocommit = True
self._ensure_schema_exists()
# remap sqlite's ? placeholders to psycopg2's %s
self._queries = {k: v.replace('?', '%s') for k, v in six.iteritems(self._queries)}
评论列表
文章目录