def test_start_and_recover_from_error(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
# try with invalid options
cur.start_replication(
slot_name=self.slot, options={'invalid_param': 'value'})
def consume(msg):
pass
# we don't see the error from the server before we try to read the data
self.assertRaises(psycopg2.DataError, cur.consume_stream, consume)
# try with correct command
cur.start_replication(slot_name=self.slot)
python类DataError()的实例源码
def test_start_and_recover_from_error(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
# try with invalid options
cur.start_replication(
slot_name=self.slot, options={'invalid_param': 'value'})
def consume(msg):
pass
# we don't see the error from the server before we try to read the data
self.assertRaises(psycopg2.DataError, cur.consume_stream, consume)
# try with correct command
cur.start_replication(slot_name=self.slot)
def test_start_and_recover_from_error(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
# try with invalid options
cur.start_replication(
slot_name=self.slot, options={'invalid_param': 'value'})
def consume(msg):
pass
# we don't see the error from the server before we try to read the data
self.assertRaises(psycopg2.DataError, cur.consume_stream, consume)
# try with correct command
cur.start_replication(slot_name=self.slot)
def test_start_and_recover_from_error(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
# try with invalid options
cur.start_replication(
slot_name=self.slot, options={'invalid_param': 'value'})
def consume(msg):
pass
# we don't see the error from the server before we try to read the data
self.assertRaises(psycopg2.DataError, cur.consume_stream, consume)
# try with correct command
cur.start_replication(slot_name=self.slot)
def test_start_and_recover_from_error(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
# try with invalid options
cur.start_replication(
slot_name=self.slot, options={'invalid_param': 'value'})
def consume(msg):
pass
# we don't see the error from the server before we try to read the data
self.assertRaises(psycopg2.DataError, cur.consume_stream, consume)
# try with correct command
cur.start_replication(slot_name=self.slot)
def test_varchar(self):
"""
A varchar type raises an error when too many characters are passed.
"""
NoPk = self.db['no_pk']
# bar is short enough
NoPk(foo='abcdefghij').flush()
self.assertRaises(psycopg2.DataError,
NoPk(foo='abcdefghijk').flush)
def testFloatInf(self):
try:
self.execute("select 'inf'::float")
except psycopg2.DataError:
return self.skipTest("inf::float not available on the server")
except ValueError:
return self.skipTest("inf not available on this platform")
s = self.execute("SELECT %s AS foo", (float("inf"),))
self.assertTrue(str(s) == "inf", "wrong float quoting: " + str(s))
self.assertTrue(type(s) == float, "wrong float conversion: " + repr(s))
s = self.execute("SELECT %s AS foo", (float("-inf"),))
self.assertTrue(str(s) == "-inf", "wrong float quoting: " + str(s))
def testArrayMalformed(self):
curs = self.conn.cursor()
ss = ['', '{', '{}}', '{' * 20 + '}' * 20]
for s in ss:
self.assertRaises(psycopg2.DataError,
psycopg2.extensions.STRINGARRAY, s.encode('utf8'), curs)
def test_parse_incomplete_date(self):
self.assertRaises(psycopg2.DataError, self.DATE, '2007', self.curs)
self.assertRaises(psycopg2.DataError, self.DATE, '2007-01', self.curs)
def test_parse_incomplete_time(self):
self.assertRaises(psycopg2.DataError, self.TIME, '13', self.curs)
self.assertRaises(psycopg2.DataError, self.TIME, '13:30', self.curs)
def test_copy_rowcount_error(self):
curs = self.conn.cursor()
curs.execute("insert into tcopy (data) values ('fff')")
self.assertEqual(curs.rowcount, 1)
self.assertRaises(psycopg2.DataError,
curs.copy_from, StringIO('aaa\nbbb\nccc\n'), 'tcopy')
self.assertEqual(curs.rowcount, -1)
def test_with_error_db(self):
def f():
with self.conn as conn:
curs = conn.cursor()
curs.execute("insert into test_with values ('a')")
self.assertRaises(psycopg2.DataError, f)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertTrue(not self.conn.closed)
curs = self.conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [])
def test_exception_swallow(self):
# bug #262: __exit__ calls cur.close() that hides the exception
# with another error.
try:
with self.conn as conn:
with conn.cursor('named') as cur:
cur.execute("select 1/0")
cur.fetchone()
except psycopg2.DataError as e:
self.assertEqual(e.pgcode, '22012')
else:
self.fail("where is my exception?")
def test_wrong_schema(self):
oid = self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
from psycopg2.extras import CompositeCaster
c = CompositeCaster('type_ii', oid, [('a', 23), ('b', 23), ('c', 23)])
curs = self.conn.cursor()
psycopg2.extensions.register_type(c.typecaster, curs)
curs.execute("select (1,2)::type_ii")
self.assertRaises(psycopg2.DataError, curs.fetchone)
def testFloatInf(self):
try:
self.execute("select 'inf'::float")
except psycopg2.DataError:
return self.skipTest("inf::float not available on the server")
except ValueError:
return self.skipTest("inf not available on this platform")
s = self.execute("SELECT %s AS foo", (float("inf"),))
self.failUnless(str(s) == "inf", "wrong float quoting: " + str(s))
self.failUnless(type(s) == float, "wrong float conversion: " + repr(s))
s = self.execute("SELECT %s AS foo", (float("-inf"),))
self.failUnless(str(s) == "-inf", "wrong float quoting: " + str(s))
def testArrayMalformed(self):
curs = self.conn.cursor()
ss = ['', '{', '{}}', '{' * 20 + '}' * 20]
for s in ss:
self.assertRaises(psycopg2.DataError,
psycopg2.extensions.STRINGARRAY, s.encode('utf8'), curs)
def test_parse_incomplete_date(self):
self.assertRaises(psycopg2.DataError, self.DATE, '2007', self.curs)
self.assertRaises(psycopg2.DataError, self.DATE, '2007-01', self.curs)
def test_parse_incomplete_time(self):
self.assertRaises(psycopg2.DataError, self.TIME, '13', self.curs)
self.assertRaises(psycopg2.DataError, self.TIME, '13:30', self.curs)
def test_copy_rowcount_error(self):
curs = self.conn.cursor()
curs.execute("insert into tcopy (data) values ('fff')")
self.assertEqual(curs.rowcount, 1)
self.assertRaises(psycopg2.DataError,
curs.copy_from, StringIO('aaa\nbbb\nccc\n'), 'tcopy')
self.assertEqual(curs.rowcount, -1)
def test_with_error_db(self):
def f():
with self.conn as conn:
curs = conn.cursor()
curs.execute("insert into test_with values ('a')")
self.assertRaises(psycopg2.DataError, f)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assert_(not self.conn.closed)
curs = self.conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [])
def test_exception_swallow(self):
# bug #262: __exit__ calls cur.close() that hides the exception
# with another error.
try:
with self.conn as conn:
with conn.cursor('named') as cur:
cur.execute("select 1/0")
cur.fetchone()
except psycopg2.DataError, e:
self.assertEqual(e.pgcode, '22012')
else:
self.fail("where is my exception?")
def test_wrong_schema(self):
oid = self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
from psycopg2.extras import CompositeCaster
c = CompositeCaster('type_ii', oid, [('a', 23), ('b', 23), ('c', 23)])
curs = self.conn.cursor()
psycopg2.extensions.register_type(c.typecaster, curs)
curs.execute("select (1,2)::type_ii")
self.assertRaises(psycopg2.DataError, curs.fetchone)
def handle_listing(listing_object, results_item):
try:
new_listing_object = extract_listing_data(results_item)
updated_listing_object = update_core_listing_data(listing_object,
new_listing_object)
updated_listing_object.save()
except (ValueError, DataError, PG_DataError):
logger.debug(traceback.print_exc(limit=None))
raise ListingException()
# Handle premature exits
def testFloatInf(self):
try:
self.execute("select 'inf'::float")
except psycopg2.DataError:
return self.skipTest("inf::float not available on the server")
except ValueError:
return self.skipTest("inf not available on this platform")
s = self.execute("SELECT %s AS foo", (float("inf"),))
self.assertTrue(str(s) == "inf", "wrong float quoting: " + str(s))
self.assertTrue(type(s) == float, "wrong float conversion: " + repr(s))
s = self.execute("SELECT %s AS foo", (float("-inf"),))
self.assertTrue(str(s) == "-inf", "wrong float quoting: " + str(s))
def testArrayMalformed(self):
curs = self.conn.cursor()
ss = ['', '{', '{}}', '{' * 20 + '}' * 20]
for s in ss:
self.assertRaises(psycopg2.DataError,
psycopg2.extensions.STRINGARRAY, s.encode('utf8'), curs)
def test_parse_incomplete_date(self):
self.assertRaises(psycopg2.DataError, self.DATE, '2007', self.curs)
self.assertRaises(psycopg2.DataError, self.DATE, '2007-01', self.curs)
def test_parse_incomplete_time(self):
self.assertRaises(psycopg2.DataError, self.TIME, '13', self.curs)
self.assertRaises(psycopg2.DataError, self.TIME, '13:30', self.curs)
def test_copy_rowcount_error(self):
curs = self.conn.cursor()
curs.execute("insert into tcopy (data) values ('fff')")
self.assertEqual(curs.rowcount, 1)
self.assertRaises(psycopg2.DataError,
curs.copy_from, StringIO('aaa\nbbb\nccc\n'), 'tcopy')
self.assertEqual(curs.rowcount, -1)
def test_with_error_db(self):
def f():
with self.conn as conn:
curs = conn.cursor()
curs.execute("insert into test_with values ('a')")
self.assertRaises(psycopg2.DataError, f)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertTrue(not self.conn.closed)
curs = self.conn.cursor()
curs.execute("select * from test_with")
self.assertEqual(curs.fetchall(), [])
def test_exception_swallow(self):
# bug #262: __exit__ calls cur.close() that hides the exception
# with another error.
try:
with self.conn as conn:
with conn.cursor('named') as cur:
cur.execute("select 1/0")
cur.fetchone()
except psycopg2.DataError as e:
self.assertEqual(e.pgcode, '22012')
else:
self.fail("where is my exception?")