def test_isolation_level_read_committed(self):
cnn1 = self.connect()
cnn2 = self.connect()
cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED)
cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(0, cur1.fetchone()[0])
cnn1.commit()
cur2 = cnn2.cursor()
cur2.execute("insert into isolevel values (10);")
cur1.execute("insert into isolevel values (20);")
cur2.execute("select count(*) from isolevel;")
self.assertEqual(1, cur2.fetchone()[0])
cnn1.commit()
cur2.execute("select count(*) from isolevel;")
self.assertEqual(2, cur2.fetchone()[0])
cur1.execute("select count(*) from isolevel;")
self.assertEqual(1, cur1.fetchone()[0])
cnn2.commit()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(2, cur1.fetchone()[0])
python类ISOLATION_LEVEL_READ_COMMITTED的实例源码
def test_isolation_level_read_committed(self):
cnn1 = self.connect()
cnn2 = self.connect()
cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED)
cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(0, cur1.fetchone()[0])
cnn1.commit()
cur2 = cnn2.cursor()
cur2.execute("insert into isolevel values (10);")
cur1.execute("insert into isolevel values (20);")
cur2.execute("select count(*) from isolevel;")
self.assertEqual(1, cur2.fetchone()[0])
cnn1.commit()
cur2.execute("select count(*) from isolevel;")
self.assertEqual(2, cur2.fetchone()[0])
cur1.execute("select count(*) from isolevel;")
self.assertEqual(1, cur1.fetchone()[0])
cnn2.commit()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(2, cur1.fetchone()[0])
def test_isolation_level_read_committed(self):
cnn1 = self.connect()
cnn2 = self.connect()
cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED)
cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(0, cur1.fetchone()[0])
cnn1.commit()
cur2 = cnn2.cursor()
cur2.execute("insert into isolevel values (10);")
cur1.execute("insert into isolevel values (20);")
cur2.execute("select count(*) from isolevel;")
self.assertEqual(1, cur2.fetchone()[0])
cnn1.commit()
cur2.execute("select count(*) from isolevel;")
self.assertEqual(2, cur2.fetchone()[0])
cur1.execute("select count(*) from isolevel;")
self.assertEqual(1, cur1.fetchone()[0])
cnn2.commit()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(2, cur1.fetchone()[0])
def _isolation_lookup(self):
extensions = self._psycopg2_extensions()
return {
'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
}
def connect(*args, **kwargs):
"""connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
kwargs['connection_factory'] = connection
conn = _2connect(*args, **kwargs)
conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
return conn
def autocommit(self, on_off=1):
"""autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
if on_off > 0:
self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
else:
self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
def test_set_isolation_level(self):
conn = self.connect()
curs = conn.cursor()
levels = [
('read uncommitted',
ext.ISOLATION_LEVEL_READ_UNCOMMITTED),
('read committed', ext.ISOLATION_LEVEL_READ_COMMITTED),
('repeatable read', ext.ISOLATION_LEVEL_REPEATABLE_READ),
('serializable', ext.ISOLATION_LEVEL_SERIALIZABLE),
]
for name, level in levels:
conn.set_isolation_level(level)
# the only values available on prehistoric PG versions
if conn.server_version < 80000:
if level in (
ext.ISOLATION_LEVEL_READ_UNCOMMITTED,
ext.ISOLATION_LEVEL_REPEATABLE_READ):
name, level = levels[levels.index((name, level)) + 1]
self.assertEqual(conn.isolation_level, level)
curs.execute('show transaction_isolation;')
got_name = curs.fetchone()[0]
self.assertEqual(name, got_name)
conn.commit()
self.assertRaises(ValueError, conn.set_isolation_level, -1)
self.assertRaises(ValueError, conn.set_isolation_level, 5)
def test_set_isolation_level_abort(self):
conn = self.connect()
cur = conn.cursor()
self.assertEqual(ext.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
cur.execute("insert into isolevel values (10);")
self.assertEqual(ext.TRANSACTION_STATUS_INTRANS,
conn.get_transaction_status())
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
cur.execute("select count(*) from isolevel;")
self.assertEqual(0, cur.fetchone()[0])
cur.execute("insert into isolevel values (10);")
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS,
conn.get_transaction_status())
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
cur.execute("select count(*) from isolevel;")
self.assertEqual(0, cur.fetchone()[0])
cur.execute("insert into isolevel values (10);")
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
cur.execute("select count(*) from isolevel;")
self.assertEqual(1, cur.fetchone()[0])
self.assertEqual(conn.isolation_level,
psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
def test_set_isolation_level(self):
cur = self.conn.cursor()
self.conn.set_session(
ext.ISOLATION_LEVEL_SERIALIZABLE)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.set_session(
ext.ISOLATION_LEVEL_REPEATABLE_READ)
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'repeatable read')
else:
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.set_session(
isolation_level=ext.ISOLATION_LEVEL_READ_COMMITTED)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
self.conn.set_session(
isolation_level=ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'read uncommitted')
else:
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
def connect(*args, **kwargs):
"""connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
kwargs['connection_factory'] = connection
conn = _2connect(*args, **kwargs)
conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
return conn
def autocommit(self, on_off=1):
"""autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
if on_off > 0:
self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
else:
self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
def test_set_isolation_level(self):
conn = self.connect()
curs = conn.cursor()
levels = [
('read uncommitted',
ext.ISOLATION_LEVEL_READ_UNCOMMITTED),
('read committed', ext.ISOLATION_LEVEL_READ_COMMITTED),
('repeatable read', ext.ISOLATION_LEVEL_REPEATABLE_READ),
('serializable', ext.ISOLATION_LEVEL_SERIALIZABLE),
]
for name, level in levels:
conn.set_isolation_level(level)
# the only values available on prehistoric PG versions
if conn.server_version < 80000:
if level in (
ext.ISOLATION_LEVEL_READ_UNCOMMITTED,
ext.ISOLATION_LEVEL_REPEATABLE_READ):
name, level = levels[levels.index((name, level)) + 1]
self.assertEqual(conn.isolation_level, level)
curs.execute('show transaction_isolation;')
got_name = curs.fetchone()[0]
self.assertEqual(name, got_name)
conn.commit()
self.assertRaises(ValueError, conn.set_isolation_level, -1)
self.assertRaises(ValueError, conn.set_isolation_level, 5)
def test_set_isolation_level(self):
cur = self.conn.cursor()
self.conn.set_session(
ext.ISOLATION_LEVEL_SERIALIZABLE)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.set_session(
ext.ISOLATION_LEVEL_REPEATABLE_READ)
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'repeatable read')
else:
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.set_session(
isolation_level=ext.ISOLATION_LEVEL_READ_COMMITTED)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
self.conn.set_session(
isolation_level=ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'read uncommitted')
else:
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
def _isolation_lookup(self):
extensions = self._psycopg2_extensions()
return {
'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
}
def _isolation_lookup(self):
from psycopg2 import extensions
return {
'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
}
def _isolation_lookup(self):
from psycopg2 import extensions
return {
'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
}
def _isolation_lookup(self):
extensions = self._psycopg2_extensions()
return {
'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
}
def _isolation_lookup(self):
extensions = self._psycopg2_extensions()
return {
'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
}
def connect(*args, **kwargs):
"""connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
kwargs['connection_factory'] = connection
conn = _2connect(*args, **kwargs)
conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
return conn
def autocommit(self, on_off=1):
"""autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
if on_off > 0:
self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
else:
self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
def test_set_isolation_level(self):
conn = self.connect()
curs = conn.cursor()
levels = [
('read uncommitted',
ext.ISOLATION_LEVEL_READ_UNCOMMITTED),
('read committed', ext.ISOLATION_LEVEL_READ_COMMITTED),
('repeatable read', ext.ISOLATION_LEVEL_REPEATABLE_READ),
('serializable', ext.ISOLATION_LEVEL_SERIALIZABLE),
]
for name, level in levels:
conn.set_isolation_level(level)
# the only values available on prehistoric PG versions
if conn.server_version < 80000:
if level in (
ext.ISOLATION_LEVEL_READ_UNCOMMITTED,
ext.ISOLATION_LEVEL_REPEATABLE_READ):
name, level = levels[levels.index((name, level)) + 1]
self.assertEqual(conn.isolation_level, level)
curs.execute('show transaction_isolation;')
got_name = curs.fetchone()[0]
self.assertEqual(name, got_name)
conn.commit()
self.assertRaises(ValueError, conn.set_isolation_level, -1)
self.assertRaises(ValueError, conn.set_isolation_level, 5)
def test_set_isolation_level_abort(self):
conn = self.connect()
cur = conn.cursor()
self.assertEqual(ext.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
cur.execute("insert into isolevel values (10);")
self.assertEqual(ext.TRANSACTION_STATUS_INTRANS,
conn.get_transaction_status())
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
cur.execute("select count(*) from isolevel;")
self.assertEqual(0, cur.fetchone()[0])
cur.execute("insert into isolevel values (10);")
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS,
conn.get_transaction_status())
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
cur.execute("select count(*) from isolevel;")
self.assertEqual(0, cur.fetchone()[0])
cur.execute("insert into isolevel values (10);")
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
cur.execute("select count(*) from isolevel;")
self.assertEqual(1, cur.fetchone()[0])
self.assertEqual(conn.isolation_level,
psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
def test_set_isolation_level(self):
cur = self.conn.cursor()
self.conn.set_session(
ext.ISOLATION_LEVEL_SERIALIZABLE)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.set_session(
ext.ISOLATION_LEVEL_REPEATABLE_READ)
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'repeatable read')
else:
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.set_session(
isolation_level=ext.ISOLATION_LEVEL_READ_COMMITTED)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
self.conn.set_session(
isolation_level=ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'read uncommitted')
else:
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
def _isolation_lookup(self):
extensions = __import__('psycopg2.extensions').extensions
return {
'AUTOCOMMIT': extensions.ISOLATION_LEVEL_AUTOCOMMIT,
'READ COMMITTED': extensions.ISOLATION_LEVEL_READ_COMMITTED,
'READ UNCOMMITTED': extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
'REPEATABLE READ': extensions.ISOLATION_LEVEL_REPEATABLE_READ,
'SERIALIZABLE': extensions.ISOLATION_LEVEL_SERIALIZABLE
}
def connect(*args, **kwargs):
"""connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
kwargs['connection_factory'] = connection
conn = _2connect(*args, **kwargs)
conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
return conn
def autocommit(self, on_off=1):
"""autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
if on_off > 0:
self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
else:
self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
def connect(*args, **kwargs):
"""connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
kwargs['connection_factory'] = connection
conn = _2connect(*args, **kwargs)
conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
return conn
def autocommit(self, on_off=1):
"""autocommit(on_off=1) -> switch autocommit on (1) or off (0)"""
if on_off > 0:
self.set_isolation_level(_ext.ISOLATION_LEVEL_AUTOCOMMIT)
else:
self.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
def connect(*args, **kwargs):
"""connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
kwargs['connection_factory'] = connection
conn = _2connect(*args, **kwargs)
conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
return conn
def connect(*args, **kwargs):
"""connect(dsn, ...) -> new psycopg 1.1.x compatible connection object"""
kwargs['connection_factory'] = connection
conn = _2connect(*args, **kwargs)
conn.set_isolation_level(_ext.ISOLATION_LEVEL_READ_COMMITTED)
return conn