def insert_one(self, table_name: str, row: dict, retcol: str):
""" inserts a single row into the specified table and returns the value of the specified
column.
row - a dict containing all row values to insert
returns: new id
"""
if not self.__conn or not row:
raise psycopg2.InterfaceError("null connection")
try:
columns = "(" + ",".join(row.keys()) + ")"
holders = "(" + ",".join(["%s"] * len(row.keys())) + ")"
fillers = tuple(row.values())
query = "insert into {table_name} {columnnames} values {values} returning {retcol}".format(**{
"table_name": table_name,
"columnnames": columns,
"values": holders,
"retcol": retcol
})
with self.__conn.cursor() as curs:
curs.execute(query, fillers)
return curs.fetchone()[0]
except Exception as e:
tb.print_exc()
raise(e)
python类InterfaceError()的实例源码
def insert(self, table_name: str, rows: list) -> int:
""" inserts the given data into the specified table.
An individual insert statement is generated per row. The query is handled as a prepared
statement so multi-inserts are more efficient.
table_name --
rows -- list of dicts. each dict representing a row to be inserted
returns: the number of rows inserted
"""
n_inserted = 0
if not self.__conn or not rows:
raise psycopg2.InterfaceError("null connection")
try:
for row in rows:
columns = "(" + ",".join(row.keys()) + ")" # "(col1, col2, col3...)"
params = ",".join(["%s"] * len(row.keys())) # "%s, %s, %s..."
query = "insert into {table_name} {columns} select {params}".format(**{
"table_name": table_name,
"columns": columns,
"params": params,
})
n_inserted += self.execute_write_query(query, tuple(row.values()))
return n_inserted
except Exception as e:
tb.print_exc()
raise(e)
def test_isolation_level_closed(self):
cnn = self.connect()
cnn.close()
self.assertRaises(psycopg2.InterfaceError,
cnn.set_isolation_level, 0)
self.assertRaises(psycopg2.InterfaceError,
cnn.set_isolation_level, 1)
def test_closed(self):
self.conn.close()
self.assertRaises(psycopg2.InterfaceError,
self.conn.set_session,
ext.ISOLATION_LEVEL_SERIALIZABLE)
def test_closed(self):
self.conn.close()
self.assertRaises(psycopg2.InterfaceError,
setattr, self.conn, 'autocommit', True)
# The getter doesn't have a guard. We may change this in future
# to make it consistent with other methods; meanwhile let's just check
# it doesn't explode.
try:
self.assertTrue(self.conn.autocommit in (True, False))
except psycopg2.InterfaceError:
pass
def test_with_closed(self):
def f():
with self.conn:
pass
self.conn.close()
self.assertRaises(psycopg2.InterfaceError, f)
def test_parse(self):
from psycopg2.extras import HstoreAdapter
def ok(s, d):
self.assertEqual(HstoreAdapter.parse(s, None), d)
ok(None, None)
ok('', {})
ok('"a"=>"1", "b"=>"2"', {'a': '1', 'b': '2'})
ok('"a" => "1" , "b" => "2"', {'a': '1', 'b': '2'})
ok('"a"=>NULL, "b"=>"2"', {'a': None, 'b': '2'})
ok(r'"a"=>"\"", "\""=>"2"', {'a': '"', '"': '2'})
ok('"a"=>"\'", "\'"=>"2"', {'a': "'", "'": '2'})
ok('"a"=>"1", "b"=>NULL', {'a': '1', 'b': None})
ok(r'"a\\"=>"1"', {'a\\': '1'})
ok(r'"a\""=>"1"', {'a"': '1'})
ok(r'"a\\\""=>"1"', {r'a\"': '1'})
ok(r'"a\\\\\""=>"1"', {r'a\\"': '1'})
def ko(s):
self.assertRaises(psycopg2.InterfaceError,
HstoreAdapter.parse, s, None)
ko('a')
ko('"a"')
ko(r'"a\\""=>"1"')
ko(r'"a\\\\""=>"1"')
ko('"a=>"1"')
ko('"a"=>"1", "b"=>NUL')
def test_async_cursor_gone(self):
import gc
cur = self.conn.cursor()
cur.execute("select 42;")
del cur
gc.collect()
self.assertRaises(psycopg2.InterfaceError, self.wait, self.conn)
# The connection is still usable
cur = self.conn.cursor()
cur.execute("select 42;")
self.wait(self.conn)
self.assertEqual(cur.fetchone(), (42,))
def test_write_after_close(self):
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, lo.write, b"some data")
def test_read_after_close(self):
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, lo.read, 5)
def test_seek_after_close(self):
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, lo.seek, 0)
def test_tell_after_close(self):
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, lo.tell)
def test_seek_larger_than_2gb(self):
lo = self.conn.lobject()
offset = 1 << 32 # 4gb
self.assertRaises(
(OverflowError, psycopg2.InterfaceError, psycopg2.NotSupportedError),
lo.seek, offset, 0)
def test_truncate_larger_than_2gb(self):
lo = self.conn.lobject()
length = 1 << 32 # 4gb
self.assertRaises(
(OverflowError, psycopg2.InterfaceError, psycopg2.NotSupportedError),
lo.truncate, length)
def parse(self, s, cur, _bsdec=_re.compile(r"\\(.)")):
"""Parse an hstore representation in a Python string.
The hstore is represented as something like::
"a"=>"1", "b"=>"2"
with backslash-escaped strings.
"""
if s is None:
return None
rv = {}
start = 0
for m in self._re_hstore.finditer(s):
if m is None or m.start() != start:
raise psycopg2.InterfaceError(
"error parsing hstore pair at char %d" % start)
k = _bsdec.sub(r'\1', m.group(1))
v = m.group(2)
if v is not None:
v = _bsdec.sub(r'\1', v)
rv[k] = v
start = m.end()
if start < len(s):
raise psycopg2.InterfaceError(
"error parsing hstore: unparsed data after char %d" % start)
return rv
def tokenize(self, s):
rv = []
for m in self._re_tokenize.finditer(s):
if m is None:
raise psycopg2.InterfaceError("can't parse type: %r" % s)
if m.group(1) is not None:
rv.append(None)
elif m.group(2) is not None:
rv.append(self._re_undouble.sub(r"\1", m.group(2)))
else:
rv.append(m.group(3))
return rv
def test_isolation_level_closed(self):
cnn = self.connect()
cnn.close()
self.assertRaises(psycopg2.InterfaceError, getattr,
cnn, 'isolation_level')
self.assertRaises(psycopg2.InterfaceError,
cnn.set_isolation_level, 0)
self.assertRaises(psycopg2.InterfaceError,
cnn.set_isolation_level, 1)
def test_closed(self):
self.conn.close()
self.assertRaises(psycopg2.InterfaceError,
self.conn.set_session,
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
def test_closed(self):
self.conn.close()
self.assertRaises(psycopg2.InterfaceError,
setattr, self.conn, 'autocommit', True)
# The getter doesn't have a guard. We may change this in future
# to make it consistent with other methods; meanwhile let's just check
# it doesn't explode.
try:
self.assert_(self.conn.autocommit in (True, False))
except psycopg2.InterfaceError:
pass
def test_with_closed(self):
def f():
with self.conn:
pass
self.conn.close()
self.assertRaises(psycopg2.InterfaceError, f)