def repl_connect(self, **kwargs):
"""Return a connection set up for replication
The connection is on "PSYCOPG2_TEST_REPL_DSN" unless overridden by
a *dsn* kwarg.
Should raise a skip test if not available, but guard for None on
old Python versions.
"""
if 'dsn' not in kwargs:
kwargs['dsn'] = repl_dsn
import psycopg2
try:
conn = self.connect(**kwargs)
except psycopg2.OperationalError, e:
return self.skipTest("replication db not configured: %s" % e)
conn.autocommit = True
return conn
评论列表
文章目录