def test_subclass_rollback(self):
rollbacks = []
class MyConn(ext.connection):
def rollback(self):
rollbacks.append(None)
super(MyConn, self).rollback()
try:
with self.connect(connection_factory=MyConn) as conn:
curs = conn.cursor()
curs.execute("insert into test_with values (11)")
1/0
except ZeroDivisionError:
pass
else:
self.assert_("exception not raised")
self.assertEqual(conn.status, ext.STATUS_READY)
self.assert_(rollbacks)
curs = conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [])
python类connection()的实例源码
def test_subclass_commit(self):
commits = []
class MyConn(ext.connection):
def commit(self):
commits.append(None)
super(MyConn, self).commit()
with self.connect(connection_factory=MyConn) as conn:
curs = conn.cursor()
curs.execute("insert into test_with values (10)")
self.assertEqual(conn.status, ext.STATUS_READY)
self.assert_(commits)
curs = self.conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [(10,)])
def test_subclass_commit(self):
commits = []
class MyConn(ext.connection):
def commit(self):
commits.append(None)
super(MyConn, self).commit()
with self.connect(connection_factory=MyConn) as conn:
curs = conn.cursor()
curs.execute("insert into test_with values (10)")
self.assertEqual(conn.status, ext.STATUS_READY)
self.assert_(commits)
curs = self.conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [(10,)])
def test_subclass_rollback(self):
rollbacks = []
class MyConn(ext.connection):
def rollback(self):
rollbacks.append(None)
super(MyConn, self).rollback()
try:
with self.connect(connection_factory=MyConn) as conn:
curs = conn.cursor()
curs.execute("insert into test_with values (11)")
1/0
except ZeroDivisionError:
pass
else:
self.assert_("exception not raised")
self.assertEqual(conn.status, ext.STATUS_READY)
self.assert_(rollbacks)
curs = conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [])
def test_subclass_commit(self):
commits = []
class MyConn(ext.connection):
def commit(self):
commits.append(None)
super(MyConn, self).commit()
with self.connect(connection_factory=MyConn) as conn:
curs = conn.cursor()
curs.execute("insert into test_with values (10)")
self.assertEqual(conn.status, ext.STATUS_READY)
self.assert_(commits)
curs = self.conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [(10,)])
def test_connection_setup(self):
cur = self.conn.cursor()
sync_cur = self.sync_conn.cursor()
del cur, sync_cur
self.assert_(self.conn.async_)
self.assert_(not self.sync_conn.async_)
# the async connection should be autocommit
self.assert_(self.conn.autocommit)
self.assertEquals(self.conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)
# check other properties to be found on the connection
self.assert_(self.conn.server_version)
self.assert_(self.conn.protocol_version in (2, 3))
self.assert_(self.conn.encoding in ext.encodings)
def test_set_parameters_while_async(self):
cur = self.conn.cursor()
cur.execute("select 'c'")
self.assertTrue(self.conn.isexecuting())
# getting transaction status works
self.assertEquals(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_ACTIVE)
self.assertTrue(self.conn.isexecuting())
# setting connection encoding should fail
self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_client_encoding, "LATIN1")
# same for transaction isolation
self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_isolation_level, 1)
def as_string(self, context):
# is it a connection or cursor?
if isinstance(context, ext.connection):
conn = context
elif isinstance(context, ext.cursor):
conn = context.connection
else:
raise TypeError("context must be a connection or a cursor")
a = ext.adapt(self._wrapped)
if hasattr(a, 'prepare'):
a.prepare(conn)
rv = a.getquoted()
if sys.version_info[0] >= 3 and isinstance(rv, bytes):
rv = rv.decode(ext.encodings[conn.encoding])
return rv
def test_subclass_commit(self):
commits = []
class MyConn(ext.connection):
def commit(self):
commits.append(None)
super(MyConn, self).commit()
with self.connect(connection_factory=MyConn) as conn:
curs = conn.cursor()
curs.execute("insert into test_with values (10)")
self.assertEqual(conn.status, ext.STATUS_READY)
self.assert_(commits)
curs = self.conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [(10,)])
def test_subclass_rollback(self):
rollbacks = []
class MyConn(ext.connection):
def rollback(self):
rollbacks.append(None)
super(MyConn, self).rollback()
try:
with self.connect(connection_factory=MyConn) as conn:
curs = conn.cursor()
curs.execute("insert into test_with values (11)")
1/0
except ZeroDivisionError:
pass
else:
self.assert_("exception not raised")
self.assertEqual(conn.status, ext.STATUS_READY)
self.assert_(rollbacks)
curs = conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [])
def test_subclass_commit(self):
commits = []
class MyConn(ext.connection):
def commit(self):
commits.append(None)
super(MyConn, self).commit()
with self.connect(connection_factory=MyConn) as conn:
curs = conn.cursor()
curs.execute("insert into test_with values (10)")
self.assertEqual(conn.status, ext.STATUS_READY)
self.assertTrue(commits)
curs = self.conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [(10,)])
def test_subclass_rollback(self):
rollbacks = []
class MyConn(ext.connection):
def rollback(self):
rollbacks.append(None)
super(MyConn, self).rollback()
try:
with self.connect(connection_factory=MyConn) as conn:
curs = conn.cursor()
curs.execute("insert into test_with values (11)")
1/0
except ZeroDivisionError:
pass
else:
self.assertTrue("exception not raised")
self.assertEqual(conn.status, ext.STATUS_READY)
self.assertTrue(rollbacks)
curs = conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [])
def test_subclass_rollback(self):
rollbacks = []
class MyConn(ext.connection):
def rollback(self):
rollbacks.append(None)
super(MyConn, self).rollback()
try:
with self.connect(connection_factory=MyConn) as conn:
curs = conn.cursor()
curs.execute("insert into test_with values (11)")
1 / 0
except ZeroDivisionError:
pass
else:
self.assert_("exception not raised")
self.assertEqual(conn.status, ext.STATUS_READY)
self.assert_(rollbacks)
curs = conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [])
def test_connection_setup(self):
cur = self.conn.cursor()
sync_cur = self.sync_conn.cursor()
del cur, sync_cur
self.assert_(self.conn.async_)
self.assert_(not self.sync_conn.async_)
# the async connection should be autocommit
self.assert_(self.conn.autocommit)
self.assertEquals(self.conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)
# check other properties to be found on the connection
self.assert_(self.conn.server_version)
self.assert_(self.conn.protocol_version in (2, 3))
self.assert_(self.conn.encoding in ext.encodings)
def test_set_parameters_while_async(self):
cur = self.conn.cursor()
cur.execute("select 'c'")
self.assertTrue(self.conn.isexecuting())
# getting transaction status works
self.assertEquals(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_ACTIVE)
self.assertTrue(self.conn.isexecuting())
# setting connection encoding should fail
self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_client_encoding, "LATIN1")
# same for transaction isolation
self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_isolation_level, 1)
def as_string(self, context):
# is it a connection or cursor?
if isinstance(context, ext.connection):
conn = context
elif isinstance(context, ext.cursor):
conn = context.connection
else:
raise TypeError("context must be a connection or a cursor")
a = ext.adapt(self._wrapped)
if hasattr(a, 'prepare'):
a.prepare(conn)
rv = a.getquoted()
if sys.version_info[0] >= 3 and isinstance(rv, bytes):
rv = rv.decode(ext.encodings[conn.encoding])
return rv
def _get_data(self, query, t):
if callable(query):
query = query()
query = prettify_sql(query)
query = FILTERED if query is None else query
duration = float(round(t, 3))
connection = get_safe_connection_string(self)
return query, duration, connection
def execute(self, query, vars=None):
timestamp = time.time()
try:
return super(TaliskerCursor, self).execute(query, vars)
finally:
duration = (time.time() - timestamp) * 1000
if vars is None:
query = None
self.connection._record('query', query, duration)
def callproc(self, procname, vars=None):
timestamp = time.time()
try:
return super(TaliskerCursor, self).callproc(procname, vars)
finally:
duration = (time.time() - timestamp) * 1000
# no query parameters, cannot safely record
self.connection._record(
'stored proc: {}'.format(procname), None, duration)
def test_cursor_closed_attribute(self):
conn = self.conn
curs = conn.cursor()
self.assertEqual(curs.closed, False)
curs.close()
self.assertEqual(curs.closed, True)
# Closing the connection closes the cursor:
curs = conn.cursor()
conn.close()
self.assertEqual(curs.closed, True)