def test_set_read_only(self):
self.assert_(self.conn.readonly is None)
cur = self.conn.cursor()
self.conn.set_session(readonly=True)
self.assert_(self.conn.readonly is True)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
self.conn.set_session(readonly=False)
self.assert_(self.conn.readonly is False)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'off')
self.conn.rollback()
python类cursor()的实例源码
def test_set_deferrable(self):
self.assert_(self.conn.deferrable is None)
cur = self.conn.cursor()
self.conn.set_session(readonly=True, deferrable=True)
self.assert_(self.conn.deferrable is True)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
cur.execute("SHOW transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
self.conn.set_session(deferrable=False)
self.assert_(self.conn.deferrable is False)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'off')
self.conn.rollback()
def test_default_no_autocommit(self):
self.assert_(not self.conn.autocommit)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_IDLE)
cur = self.conn.cursor()
cur.execute('select 1;')
self.assertEqual(self.conn.status, ext.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_INTRANS)
self.conn.rollback()
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_IDLE)
def test_set_autocommit(self):
self.conn.autocommit = True
self.assert_(self.conn.autocommit)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_IDLE)
cur = self.conn.cursor()
cur.execute('select 1;')
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_IDLE)
self.conn.autocommit = False
self.assert_(not self.conn.autocommit)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_IDLE)
cur.execute('select 1;')
self.assertEqual(self.conn.status, ext.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_INTRANS)
def test_subclass_commit(self):
commits = []
class MyConn(ext.connection):
def commit(self):
commits.append(None)
super(MyConn, self).commit()
with self.connect(connection_factory=MyConn) as conn:
curs = conn.cursor()
curs.execute("insert into test_with values (10)")
self.assertEqual(conn.status, ext.STATUS_READY)
self.assert_(commits)
curs = self.conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [(10,)])
def test_subclass_rollback(self):
rollbacks = []
class MyConn(ext.connection):
def rollback(self):
rollbacks.append(None)
super(MyConn, self).rollback()
try:
with self.connect(connection_factory=MyConn) as conn:
curs = conn.cursor()
curs.execute("insert into test_with values (11)")
1 / 0
except ZeroDivisionError:
pass
else:
self.assert_("exception not raised")
self.assertEqual(conn.status, ext.STATUS_READY)
self.assert_(rollbacks)
curs = conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [])
def test_with_error(self):
try:
with self.conn as conn:
with conn.cursor() as curs:
curs.execute("insert into test_with values (5)")
1 / 0
except ZeroDivisionError:
pass
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assert_(not self.conn.closed)
self.assert_(curs.closed)
curs = self.conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [])
def test_notices_consistent_order(self):
conn = self.conn
cur = conn.cursor()
if self.conn.server_version >= 90300:
cur.execute("set client_min_messages=debug1")
cur.execute("""
create temp table table1 (id serial);
create temp table table2 (id serial);
""")
cur.execute("""
create temp table table3 (id serial);
create temp table table4 (id serial);
""")
self.assertEqual(4, len(conn.notices))
self.assertTrue('table1' in conn.notices[0])
self.assertTrue('table2' in conn.notices[1])
self.assertTrue('table3' in conn.notices[2])
self.assertTrue('table4' in conn.notices[3])
def test_concurrent_execution(self):
def slave():
cnn = self.connect()
cur = cnn.cursor()
cur.execute("select pg_sleep(4)")
cur.close()
cnn.close()
t1 = threading.Thread(target=slave)
t2 = threading.Thread(target=slave)
t0 = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
self.assertTrue(time.time() - t0 < 7,
"something broken in concurrency")
def test_set_isolation_level_default(self):
conn = self.connect()
curs = conn.cursor()
conn.autocommit = True
curs.execute("set default_transaction_isolation to 'read committed'")
conn.autocommit = False
conn.set_isolation_level(ext.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(conn.isolation_level,
ext.ISOLATION_LEVEL_SERIALIZABLE)
curs.execute("show transaction_isolation")
self.assertEqual(curs.fetchone()[0], "serializable")
conn.rollback()
conn.set_isolation_level(ext.ISOLATION_LEVEL_DEFAULT)
curs.execute("show transaction_isolation")
self.assertEqual(curs.fetchone()[0], "read committed")
def test_isolation_level_read_committed(self):
cnn1 = self.connect()
cnn2 = self.connect()
cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED)
cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(0, cur1.fetchone()[0])
cnn1.commit()
cur2 = cnn2.cursor()
cur2.execute("insert into isolevel values (10);")
cur1.execute("insert into isolevel values (20);")
cur2.execute("select count(*) from isolevel;")
self.assertEqual(1, cur2.fetchone()[0])
cnn1.commit()
cur2.execute("select count(*) from isolevel;")
self.assertEqual(2, cur2.fetchone()[0])
cur1.execute("select count(*) from isolevel;")
self.assertEqual(1, cur1.fetchone()[0])
cnn2.commit()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(2, cur1.fetchone()[0])
def test_tpc_commit(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit');")
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_prepare()
self.assertEqual(cnn.status, ext.STATUS_PREPARED)
self.assertEqual(1, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_commit()
self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records())
def test_tpc_commit_one_phase(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit_1p');")
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_commit()
self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records())
def test_tpc_rollback(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_rollback');")
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_prepare()
self.assertEqual(cnn.status, ext.STATUS_PREPARED)
self.assertEqual(1, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_rollback()
self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
def test_tpc_rollback_one_phase(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_rollback_1p');")
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_rollback()
self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
def test_tpc_rollback_recovered(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid)
self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit_rec');")
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn.tpc_prepare()
cnn.close()
self.assertEqual(1, self.count_xacts())
self.assertEqual(0, self.count_test_records())
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
cnn.tpc_rollback(xid)
self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
def test_set_read_only(self):
self.assertTrue(self.conn.readonly is None)
cur = self.conn.cursor()
self.conn.set_session(readonly=True)
self.assertTrue(self.conn.readonly is True)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
self.conn.set_session(readonly=False)
self.assertTrue(self.conn.readonly is False)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'off')
self.conn.rollback()
def test_set_deferrable(self):
self.assertTrue(self.conn.deferrable is None)
cur = self.conn.cursor()
self.conn.set_session(readonly=True, deferrable=True)
self.assertTrue(self.conn.deferrable is True)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
cur.execute("SHOW transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
self.conn.set_session(deferrable=False)
self.assertTrue(self.conn.deferrable is False)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'off')
self.conn.rollback()
def test_mixing_session_attribs(self):
cur = self.conn.cursor()
self.conn.autocommit = True
self.conn.readonly = True
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW default_transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.autocommit = False
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW default_transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'off')
def test_default_no_autocommit(self):
self.assertTrue(not self.conn.autocommit)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_IDLE)
cur = self.conn.cursor()
cur.execute('select 1;')
self.assertEqual(self.conn.status, ext.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_INTRANS)
self.conn.rollback()
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_IDLE)