def test_tpc_commit_recovered(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit_rec');")
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_prepare()
cnn.close()
self.assertEqual(1, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
cnn.tpc_commit(xid)
self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records())
python类connect()的实例源码
def test_tpc_rollback(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_rollback');")
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_prepare()
self.assertEqual(cnn.status, ext.STATUS_PREPARED)
self.assertEqual(1, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_rollback()
self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
def test_tpc_rollback_one_phase(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_rollback_1p');")
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_rollback()
self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
def test_tpc_rollback_recovered(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit_rec');")
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_prepare()
cnn.close()
self.assertEqual(1, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
cnn.tpc_rollback(xid)
self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
def test_xid_roundtrip(self):
for fid, gtrid, bqual in [
(0, "", ""),
(42, "gtrid", "bqual"),
(0x7fffffff, "x" * 64, "y" * 64),
]:
cnn = self.connect()
xid = cnn.xid(fid, gtrid, bqual)
cnn.tpc_begin(xid)
cnn.tpc_prepare()
cnn.close()
cnn = self.connect()
xids = [x for x in cnn.tpc_recover() if x.database == dbname]
self.assertEqual(1, len(xids))
xid = xids[0]
self.assertEqual(xid.format_id, fid)
self.assertEqual(xid.gtrid, gtrid)
self.assertEqual(xid.bqual, bqual)
cnn.tpc_rollback(xid)
def test_unparsed_roundtrip(self):
for tid in [
'',
'hello, world!',
'x' * 199, # PostgreSQL's limit in transaction id length
]:
cnn = self.connect()
cnn.tpc_begin(tid)
cnn.tpc_prepare()
cnn.close()
cnn = self.connect()
xids = [x for x in cnn.tpc_recover() if x.database == dbname]
self.assertEqual(1, len(xids))
xid = xids[0]
self.assertEqual(xid.format_id, None)
self.assertEqual(xid.gtrid, tid)
self.assertEqual(xid.bqual, None)
cnn.tpc_rollback(xid)
def test_copy_from_segfault(self):
# issue #219
script = ("""\
import psycopg2
conn = psycopg2.connect(%(dsn)r)
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
try:
curs.execute("copy copy_segf from stdin")
except psycopg2.ProgrammingError:
pass
conn.close()
""" % {'dsn': dsn})
proc = Popen([sys.executable, '-c', script_to_py3(script)])
proc.communicate()
self.assertEqual(0, proc.returncode)
def test_copy_to_segfault(self):
# issue #219
script = ("""\
import psycopg2
conn = psycopg2.connect(%(dsn)r)
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
try:
curs.execute("copy copy_segf to stdout")
except psycopg2.ProgrammingError:
pass
conn.close()
""" % {'dsn': dsn})
proc = Popen([sys.executable, '-c', script_to_py3(script)], stdout=PIPE)
proc.communicate()
self.assertEqual(0, proc.returncode)
def test_supported_keywords(self):
psycopg2.connect(database='foo')
self.assertEqual(self.args[0], 'dbname=foo')
psycopg2.connect(user='postgres')
self.assertEqual(self.args[0], 'user=postgres')
psycopg2.connect(password='secret')
self.assertEqual(self.args[0], 'password=secret')
psycopg2.connect(port=5432)
self.assertEqual(self.args[0], 'port=5432')
psycopg2.connect(sslmode='require')
self.assertEqual(self.args[0], 'sslmode=require')
psycopg2.connect(database='foo',
user='postgres', password='secret', port=5432)
self.assert_('dbname=foo' in self.args[0])
self.assert_('user=postgres' in self.args[0])
self.assert_('password=secret' in self.args[0])
self.assert_('port=5432' in self.args[0])
self.assertEqual(len(self.args[0].split()), 4)
def get_connection_params(self):
settings_dict = self.settings_dict
# None may be used to connect to the default 'postgres' db
if settings_dict['NAME'] == '':
raise ImproperlyConfigured(
"settings.DATABASES is improperly configured. "
"Please supply the NAME value.")
conn_params = {
'database': settings_dict['NAME'] or 'postgres',
}
conn_params.update(settings_dict['OPTIONS'])
conn_params.pop('isolation_level', None)
if settings_dict['USER']:
conn_params['user'] = settings_dict['USER']
if settings_dict['PASSWORD']:
conn_params['password'] = force_str(settings_dict['PASSWORD'])
if settings_dict['HOST']:
conn_params['host'] = settings_dict['HOST']
if settings_dict['PORT']:
conn_params['port'] = settings_dict['PORT']
return conn_params
def get_new_connection(self, conn_params):
connection = Database.connect(**conn_params)
# self.isolation_level must be set:
# - after connecting to the database in order to obtain the database's
# default when no value is explicitly specified in options.
# - before calling _set_autocommit() because if autocommit is on, that
# will set connection.isolation_level to ISOLATION_LEVEL_AUTOCOMMIT.
options = self.settings_dict['OPTIONS']
try:
self.isolation_level = options['isolation_level']
except KeyError:
self.isolation_level = connection.isolation_level
else:
# Set the isolation level to the value from OPTIONS.
if self.isolation_level != connection.isolation_level:
connection.set_session(isolation_level=self.isolation_level)
return connection
def _connect(self) -> None:
logger.info("initializing database connection:")
logger.info("host: %s", self.host)
logger.info("port: %s", self.port)
logger.info("dbname: %s", self.dbname)
try:
self.conn = psycopg2.connect(host=self.host,
port=self.port,
user=self.user,
password=self.password,
dbname=self.dbname,
connect_timeout=5)
self.conn.set_session(autocommit=True)
logger.info("successfully initialized database connection")
except psycopg2.Error as error:
logger.exception("unable to connect to database")
raise error
def from_environment(cls) -> Optional['PostgresDemoDatabase']:
host = os.environ.get("DEMO_POSTGRES_HOST")
port = os.environ.get("DEMO_POSTGRES_PORT") or "5432"
dbname = os.environ.get("DEMO_POSTGRES_DBNAME")
user = os.environ.get("DEMO_POSTGRES_USER")
password = os.environ.get("DEMO_POSTGRES_PASSWORD")
if all([host, port, dbname, user, password]):
try:
logger.info("Initializing demo database connection using environment variables")
return PostgresDemoDatabase(dbname=dbname, host=host, port=port, user=user, password=password)
except psycopg2.Error:
logger.exception("unable to connect to database, permalinks not enabled")
return None
else:
logger.info("Relevant environment variables not found, so no demo database")
return None
def uriDatabaseConnect(self, uri):
"""Create a connection from a uri and return a cursor of it."""
conninfo = uri.connectionInfo()
conn = None
cur = None
ok = False
while not conn:
try:
conn = psycopg2.connect(uri.connectionInfo())
cur = conn.cursor()
except psycopg2.OperationalError as e:
(ok, user, passwd) = QgsCredentials.instance().get(conninfo, uri.username(), uri.password())
if not ok:
break
if not conn:
QMessageBox.warning(self.iface.mainWindow(), "Connection Error", "Could not connect to PostgreSQL database - check connection info")
if ok:
QgsCredentials.instance().put(conninfo, user, passwd)
return cur
def __init__(self, config, db_config, web, plugin_path):
super().__init__()
self.db = db_connect(
host = db_config.get(c.DB_HOST_KEY),
user = db_config.get(c.DB_USER_KEY)
)
self.config = config
self.web = web
self.plugin_collection = PluginCollection(self, plugin_path)
self.token = self.config.get(c.GLOBIBOT_TOKEN_KEY)
self.masters = [
str(id) for id in
self.config.get(c.MASTER_IDS_KEY, [])
]
# self.enabled_servers = [
# str(id) for id in
# self.config.get(c.ENABLED_SERVERS_KEY, [])
# ]
self.web.add_routes('bot', *api.routes(self))
def run_sql_multiprocessing(args):
the_sql = args[0]
settings = args[1]
pg_conn = psycopg2.connect(settings['pg_connect_string'])
pg_conn.autocommit = True
pg_cur = pg_conn.cursor()
# # set raw gnaf database schema (it's needed for the primary and foreign key creation)
# if settings['raw_gnaf_schema'] != "public":
# pg_cur.execute("SET search_path = {0}, public, pg_catalog".format(settings['raw_gnaf_schema'],))
try:
pg_cur.execute(the_sql)
result = "SUCCESS"
except Exception as ex:
result = "SQL FAILED! : {0} : {1}".format(the_sql, ex)
pg_cur.close()
pg_conn.close()
return result
def intermediate_shapefile_load_step(args):
work_dict = args[0]
settings = args[1]
# logger = args[2]
file_path = work_dict['file_path']
pg_table = work_dict['pg_table']
pg_schema = work_dict['pg_schema']
delete_table = work_dict['delete_table']
spatial = work_dict['spatial']
pg_conn = psycopg2.connect(settings['pg_connect_string'])
pg_conn.autocommit = True
pg_cur = pg_conn.cursor()
result = import_shapefile_to_postgres(pg_cur, file_path, pg_table, pg_schema, delete_table, spatial)
return result
# imports a Shapefile into Postgres in 2 steps: SHP > SQL; SQL > Postgres
# overcomes issues trying to use psql with PGPASSWORD set at runtime
def clear_static_routes(host, port, user, passwd):
# --------------????????----------------------
con = None
try:
con = psycopg2.connect(database='cloud_controller', user=user,
password=passwd, host=host, port=port)
cur = con.cursor()
cur.execute('delete from static_routes')
con.commit()
except psycopg2.DatabaseError as e:
if con:
con.rollback()
print('Error is %s' % e)
finally:
if con:
con.close()
def update_redirect_url(host, port, user, passwd, domain_name):
# --------------????????----------------------
con = None
try:
con = psycopg2.connect(database='uaa', user=user,
password=passwd, host=host, port=port)
cur = con.cursor()
cur.execute("update oauth_client_details set"
" web_server_redirect_uri='http://uaa.cloudfoundry.com/redirect/vmc,"
"https://uaa.cloudfoundry.com/redirect/vmc,"
"http://uaa.%s/redirect/vmc,https://uaa.%s/redirect/vmc'"
" where client_id in ('simple', 'vmc')" % (domain_name, domain_name))
con.commit()
except psycopg2.DatabaseError as e:
if con:
con.rollback()
print('Error is %s' % e)
finally:
if con:
con.close()
def install_extensions(extensions, **connection_parameters):
"""Install Postgres extension if available.
Notes
-----
- superuser is generally required for installing extensions.
- Currently does not support specific schema.
"""
from postpy.connections import connect
conn = connect(**connection_parameters)
conn.autocommit = True
for extension in extensions:
install_extension(conn, extension)