def test_copy_to_segfault(self):
# issue #219
script = ("""\
import psycopg2
conn = psycopg2.connect(%(dsn)r)
curs = conn.cursor()
curs.execute("create table copy_segf (id int)")
try:
curs.execute("copy copy_segf to stdout")
except psycopg2.ProgrammingError:
pass
conn.close()
""" % {'dsn': dsn})
proc = Popen([sys.executable, '-c', script_to_py3(script)], stdout=PIPE)
proc.communicate()
self.assertEqual(0, proc.returncode)
python类ProgrammingError()的实例源码
def _create_type(self, name, fields):
curs = self.conn.cursor()
try:
curs.execute("drop type %s cascade;" % name)
except psycopg2.ProgrammingError:
self.conn.rollback()
curs.execute("create type %s as (%s);" % (name,
", ".join(["%s %s" % p for p in fields])))
if '.' in name:
schema, name = name.split('.')
else:
schema = 'public'
curs.execute("""\
SELECT t.oid
FROM pg_type t JOIN pg_namespace ns ON typnamespace = ns.oid
WHERE typname = %s and nspname = %s;
""", (name, schema))
oid = curs.fetchone()[0]
self.conn.commit()
return oid
def test_withhold(self):
self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
withhold=True)
self._create_withhold_table()
curs = self.conn.cursor("W")
self.assertEqual(curs.withhold, False)
curs.withhold = True
self.assertEqual(curs.withhold, True)
curs.execute("select data from withhold order by data")
self.conn.commit()
self.assertEqual(curs.fetchall(), [(10,), (20,), (30,)])
curs.close()
curs = self.conn.cursor("W", withhold=True)
self.assertEqual(curs.withhold, True)
curs.execute("select data from withhold order by data")
self.conn.commit()
self.assertEqual(curs.fetchall(), [(10,), (20,), (30,)])
curs = self.conn.cursor()
curs.execute("drop table withhold")
self.conn.commit()
def test_not_scrollable(self):
self.assertRaises(psycopg2.ProgrammingError, self.conn.cursor,
scrollable=False)
curs = self.conn.cursor()
curs.execute("create table scrollable (data int)")
curs.executemany("insert into scrollable values (%s)",
[(i,) for i in range(100)])
curs.close()
curs = self.conn.cursor("S") # default scrollability
curs.execute("select * from scrollable")
self.assertEqual(curs.scrollable, None)
curs.scroll(2)
try:
curs.scroll(-1)
except psycopg2.OperationalError:
return self.skipTest("can't evaluate non-scrollable cursor")
curs.close()
curs = self.conn.cursor("S", scrollable=False)
self.assertEqual(curs.scrollable, False)
curs.execute("select * from scrollable")
curs.scroll(2)
self.assertRaises(psycopg2.OperationalError, curs.scroll, -1)
def test_async_iter(self):
cur = self.conn.cursor()
cur.execute("begin")
self.wait(cur)
cur.execute("""
insert into table1 values (1);
insert into table1 values (2);
insert into table1 values (3);
""")
self.wait(cur)
cur.execute("select id from table1 order by id")
# iteration fails if a query is underway
self.assertRaises(psycopg2.ProgrammingError, list, cur)
# but after it's done it should work
self.wait(cur)
self.assertEqual(list(cur), [(1, ), (2, ), (3, )])
self.assertFalse(self.conn.isexecuting())
def test_base_db_ext_valid_and_wrong_inserts(self):
await self.init_db()
with pytest.raises(psycopg2.ProgrammingError):
await self.app.db.execute(db_name='test_db', query="""
INSERT INTO test_table(name) VALUES ('djaio_boo');
INSERT INTO test_table(name1) VALUES ('djaio');
""", values=[], _type='select')
bad_val = await self.app.db.execute('test_db', "select * from test_table where name='djaio_boo';", [], 'select')
assert bad_val == []
assert True
def get_or_create_genome(rec, cur):
'''Fetch existing genome entry or create a new one'''
try:
taxid = get_or_create_tax_id(cur, get_taxid(rec), get_strain(rec))
except psycopg2.ProgrammingError:
print(rec)
raise
cur.execute("SELECT genome_id FROM antismash.genomes WHERE tax_id = %s", (taxid,))
ret = cur.fetchone()
if ret is None:
cur.execute("INSERT INTO antismash.genomes (tax_id) VALUES (%s) RETURNING genome_id;", (taxid,))
ret = cur.fetchone()
return ret[0]
def load(self, db):
"""Load the application from the specified database
db: a ravel.db.RavelDb instance into which the application will be loaded"""
if self.sqlfile is None:
logger.debug("loaded application %s with no SQL file", self.name)
return
with open(self.sqlfile) as f:
try:
db.cursor.execute(f.read())
except psycopg2.ProgrammingError, e:
print "Error loading app {0}: {1}".format(self.name, e)
logger.debug("loaded application %s", self.name)
def test_create_slot(slot):
with patch.object(psycopg2.ProgrammingError, 'pgcode',
new_callable=PropertyMock,
return_value=psycopg2.errorcodes.DUPLICATE_OBJECT):
pe = psycopg2.ProgrammingError()
slot._repl_cursor.create_replication_slot = Mock(side_effect=pe)
slot.create_slot()
slot._repl_cursor.create_replication_slot.assert_called_with('pg2kinesis',
slot_type=psycopg2.extras.REPLICATION_LOGICAL,
output_plugin=u'test_decoding')
with patch.object(psycopg2.ProgrammingError, 'pgcode',
new_callable=PropertyMock,
return_value=-1):
pe = psycopg2.ProgrammingError()
slot._repl_cursor.create_replication_slot = Mock(side_effect=pe)
with pytest.raises(psycopg2.ProgrammingError) as e_info:
slot.create_slot()
slot._repl_cursor.create_replication_slot.assert_called_with('pg2kinesis',
slot_type=psycopg2.extras.REPLICATION_LOGICAL,
output_plugin=u'test_decoding')
assert e_info.value.pgcode == -1
slot._repl_cursor.create_replication_slot = Mock(side_effect=Exception)
with pytest.raises(Exception):
slot.create_slot()
slot._repl_cursor.create_replication_slot.assert_called_with('pg2kinesis',
slot_type=psycopg2.extras.REPLICATION_LOGICAL,
output_plugin=u'test_decoding')
def create_slot(self):
logger.info('Creating slot %s' % self.slot_name)
try:
self._repl_cursor.create_replication_slot(self.slot_name,
slot_type=psycopg2.extras.REPLICATION_LOGICAL,
output_plugin='test_decoding')
except psycopg2.ProgrammingError as p:
# Will be raised if slot exists already.
if p.pgcode != psycopg2.errorcodes.DUPLICATE_OBJECT:
logger.error(p)
raise
else:
logger.info('Slot %s is already present.' % self.slot_name)
def delete_slot(self):
logger.info('Deleting slot %s' % self.slot_name)
try:
self._repl_cursor.drop_replication_slot(self.slot_name)
except psycopg2.ProgrammingError as p:
# Will be raised if slot exists already.
if p.pgcode != psycopg2.errorcodes.UNDEFINED_OBJECT:
logger.error(p)
raise
else:
logger.info('Slot %s was not found.' % self.slot_name)
def skip_if_no_superuser(f):
"""Skip a test if the database user running the test is not a superuser"""
@wraps(f)
def skip_if_no_superuser_(self):
from psycopg2 import ProgrammingError
try:
return f(self)
except ProgrammingError as e:
import psycopg2.errorcodes
if e.pgcode == psycopg2.errorcodes.INSUFFICIENT_PRIVILEGE:
self.skipTest("skipped because not superuser")
else:
raise
return skip_if_no_superuser_
def test_no_mro_no_joy(self):
from psycopg2.extensions import adapt, register_adapter, AsIs
class A:
pass
class B(A):
pass
register_adapter(A, lambda a: AsIs("a"))
try:
self.assertRaises(psycopg2.ProgrammingError, adapt, B())
finally:
del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
def make_replication_events(self):
conn = self.connect()
if conn is None:
return
cur = conn.cursor()
try:
cur.execute("DROP TABLE dummy1")
except psycopg2.ProgrammingError:
conn.rollback()
cur.execute(
"CREATE TABLE dummy1 AS SELECT * FROM generate_series(1, 5) AS id")
conn.commit()
def test_create_replication_slot(self):
conn = self.repl_connect(connection_factory=PhysicalReplicationConnection)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur)
self.assertRaises(
psycopg2.ProgrammingError, self.create_replication_slot, cur)
def test_async_replication(self):
conn = self.repl_connect(
connection_factory=LogicalReplicationConnection, async_=1)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
self.wait(cur)
cur.start_replication(self.slot)
self.wait(cur)
self.make_replication_events()
self.msg_count = 0
def consume(msg):
# just check the methods
"%s: %s" % (cur.io_timestamp, repr(msg))
self.msg_count += 1
if self.msg_count > 3:
cur.send_feedback(reply=True)
raise StopReplication()
cur.send_feedback(flush_lsn=msg.data_start)
# cannot be used in asynchronous mode
self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)
def process_stream():
from select import select
while True:
msg = cur.read_message()
if msg:
consume(msg)
else:
select([cur], [], [])
self.assertRaises(StopReplication, process_stream)
def test_parse_dsn(self):
from psycopg2 import ProgrammingError
self.assertEqual(
ext.parse_dsn('dbname=test user=tester password=secret'),
dict(user='tester', password='secret', dbname='test'),
"simple DSN parsed")
self.assertRaises(ProgrammingError, ext.parse_dsn,
"dbname=test 2 user=tester password=secret")
self.assertEqual(
ext.parse_dsn("dbname='test 2' user=tester password=secret"),
dict(user='tester', password='secret', dbname='test 2'),
"DSN with quoting parsed")
# Can't really use assertRaisesRegexp() here since we need to
# make sure that secret is *not* exposed in the error messgage
# (and it also requires python >= 2.7).
raised = False
try:
# unterminated quote after dbname:
ext.parse_dsn("dbname='test 2 user=tester password=secret")
except ProgrammingError as e:
raised = True
self.assertTrue(str(e).find('secret') < 0,
"DSN was not exposed in error message")
except e:
self.fail("unexpected error condition: " + repr(e))
self.assertTrue(raised, "ProgrammingError raised due to invalid DSN")