def test_execute(self):
cur = self.conn.cursor()
cur.execute("""
create table test_compose (
id serial primary key,
foo text, bar text, "ba'z" text)
""")
cur.execute(
sql.SQL("insert into {0} (id, {1}) values (%s, {2})").format(
sql.Identifier('test_compose'),
sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])),
(sql.Placeholder() * 3).join(', ')),
(10, 'a', 'b', 'c'))
cur.execute("select * from test_compose")
self.assertEqual(cur.fetchall(), [(10, 'a', 'b', 'c')])
python类sql()的实例源码
def test_executemany(self):
cur = self.conn.cursor()
cur.execute("""
create table test_compose (
id serial primary key,
foo text, bar text, "ba'z" text)
""")
cur.executemany(
sql.SQL("insert into {0} (id, {1}) values (%s, {2})").format(
sql.Identifier('test_compose'),
sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])),
(sql.Placeholder() * 3).join(', ')),
[(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')])
cur.execute("select * from test_compose")
self.assertEqual(cur.fetchall(),
[(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')])
def test_execute(self):
cur = self.conn.cursor()
cur.execute("""
create table test_compose (
id serial primary key,
foo text, bar text, "ba'z" text)
""")
cur.execute(
sql.SQL("insert into {0} (id, {1}) values (%s, {2})").format(
sql.Identifier('test_compose'),
sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])),
(sql.Placeholder() * 3).join(', ')),
(10, 'a', 'b', 'c'))
cur.execute("select * from test_compose")
self.assertEqual(cur.fetchall(), [(10, 'a', 'b', 'c')])
def test_executemany(self):
cur = self.conn.cursor()
cur.execute("""
create table test_compose (
id serial primary key,
foo text, bar text, "ba'z" text)
""")
cur.executemany(
sql.SQL("insert into {0} (id, {1}) values (%s, {2})").format(
sql.Identifier('test_compose'),
sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])),
(sql.Placeholder() * 3).join(', ')),
[(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')])
cur.execute("select * from test_compose")
self.assertEqual(cur.fetchall(),
[(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')])
def test_execute(self):
cur = self.conn.cursor()
cur.execute("""
create table test_compose (
id serial primary key,
foo text, bar text, "ba'z" text)
""")
cur.execute(
sql.SQL("insert into {0} (id, {1}) values (%s, {2})").format(
sql.Identifier('test_compose'),
sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])),
(sql.Placeholder() * 3).join(', ')),
(10, 'a', 'b', 'c'))
cur.execute("select * from test_compose")
self.assertEqual(cur.fetchall(), [(10, 'a', 'b', 'c')])
def test_executemany(self):
cur = self.conn.cursor()
cur.execute("""
create table test_compose (
id serial primary key,
foo text, bar text, "ba'z" text)
""")
cur.executemany(
sql.SQL("insert into {0} (id, {1}) values (%s, {2})").format(
sql.Identifier('test_compose'),
sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])),
(sql.Placeholder() * 3).join(', ')),
[(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')])
cur.execute("select * from test_compose")
self.assertEqual(cur.fetchall(),
[(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')])
def test_copy(self):
cur = self.conn.cursor()
cur.execute("""
create table test_compose (
id serial primary key,
foo text, bar text, "ba'z" text)
""")
s = StringIO("10\ta\tb\tc\n20\td\te\tf\n")
cur.copy_expert(
sql.SQL("copy {t} (id, foo, bar, {f}) from stdin").format(
t=sql.Identifier("test_compose"), f=sql.Identifier("ba'z")), s)
s1 = StringIO()
cur.copy_expert(
sql.SQL("copy (select {f} from {t} order by id) to stdout").format(
t=sql.Identifier("test_compose"), f=sql.Identifier("ba'z")), s1)
s1.seek(0)
self.assertEqual(s1.read(), 'c\nf\n')
def test_execute(self):
cur = self.conn.cursor()
cur.execute("""
create table test_compose (
id serial primary key,
foo text, bar text, "ba'z" text)
""")
cur.execute(
sql.SQL("insert into {0} (id, {1}) values (%s, {2})").format(
sql.Identifier('test_compose'),
sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])),
(sql.Placeholder() * 3).join(', ')),
(10, 'a', 'b', 'c'))
cur.execute("select * from test_compose")
self.assertEqual(cur.fetchall(), [(10, 'a', 'b', 'c')])
def test_executemany(self):
cur = self.conn.cursor()
cur.execute("""
create table test_compose (
id serial primary key,
foo text, bar text, "ba'z" text)
""")
cur.executemany(
sql.SQL("insert into {0} (id, {1}) values (%s, {2})").format(
sql.Identifier('test_compose'),
sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])),
(sql.Placeholder() * 3).join(', ')),
[(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')])
cur.execute("select * from test_compose")
self.assertEqual(cur.fetchall(),
[(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')])
def test_execute(self):
cur = self.conn.cursor()
cur.execute("""
create table test_compose (
id serial primary key,
foo text, bar text, "ba'z" text)
""")
cur.execute(
sql.SQL("insert into {0} (id, {1}) values (%s, {2})").format(
sql.Identifier('test_compose'),
sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])),
(sql.Placeholder() * 3).join(', ')),
(10, 'a', 'b', 'c'))
cur.execute("select * from test_compose")
self.assertEqual(cur.fetchall(), [(10, 'a', 'b', 'c')])
def test_executemany(self):
cur = self.conn.cursor()
cur.execute("""
create table test_compose (
id serial primary key,
foo text, bar text, "ba'z" text)
""")
cur.executemany(
sql.SQL("insert into {0} (id, {1}) values (%s, {2})").format(
sql.Identifier('test_compose'),
sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])),
(sql.Placeholder() * 3).join(', ')),
[(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')])
cur.execute("select * from test_compose")
self.assertEqual(cur.fetchall(),
[(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')])
def test_copy(self):
cur = self.conn.cursor()
cur.execute("""
create table test_compose (
id serial primary key,
foo text, bar text, "ba'z" text)
""")
s = StringIO("10\ta\tb\tc\n20\td\te\tf\n")
cur.copy_expert(
sql.SQL("copy {t} (id, foo, bar, {f}) from stdin").format(
t=sql.Identifier("test_compose"), f=sql.Identifier("ba'z")), s)
s1 = StringIO()
cur.copy_expert(
sql.SQL("copy (select {f} from {t} order by id) to stdout").format(
t=sql.Identifier("test_compose"), f=sql.Identifier("ba'z")), s1)
s1.seek(0)
self.assertEqual(s1.read(), 'c\nf\n')
def test_pos(self):
s = sql.SQL("select {} from {}").format(
sql.Identifier('field'), sql.Identifier('table'))
s1 = s.as_string(self.conn)
self.assertTrue(isinstance(s1, str))
self.assertEqual(s1, 'select "field" from "table"')
def test_pos_spec(self):
s = sql.SQL("select {0} from {1}").format(
sql.Identifier('field'), sql.Identifier('table'))
s1 = s.as_string(self.conn)
self.assertTrue(isinstance(s1, str))
self.assertEqual(s1, 'select "field" from "table"')
s = sql.SQL("select {1} from {0}").format(
sql.Identifier('table'), sql.Identifier('field'))
s1 = s.as_string(self.conn)
self.assertTrue(isinstance(s1, str))
self.assertEqual(s1, 'select "field" from "table"')
def test_dict(self):
s = sql.SQL("select {f} from {t}").format(
f=sql.Identifier('field'), t=sql.Identifier('table'))
s1 = s.as_string(self.conn)
self.assertTrue(isinstance(s1, str))
self.assertEqual(s1, 'select "field" from "table"')
def test_unicode(self):
s = sql.SQL("select {0} from {1}").format(
sql.Identifier('field'), sql.Identifier('table'))
s1 = s.as_string(self.conn)
self.assertTrue(isinstance(s1, str))
self.assertEqual(s1, 'select "field" from "table"')
def test_compose_literal(self):
s = sql.SQL("select {0};").format(sql.Literal(dt.date(2016, 12, 31)))
s1 = s.as_string(self.conn)
self.assertEqual(s1, "select '2016-12-31'::date;")
def test_percent_escape(self):
s = sql.SQL("42 % {0}").format(sql.Literal(7))
s1 = s.as_string(self.conn)
self.assertEqual(s1, "42 % 7")
def test_braces_escape(self):
s = sql.SQL("{{{0}}}").format(sql.Literal(7))
self.assertEqual(s.as_string(self.conn), "{7}")
s = sql.SQL("{{1,{0}}}").format(sql.Literal(7))
self.assertEqual(s.as_string(self.conn), "{1,7}")
def test_compose_badnargs(self):
self.assertRaises(IndexError, sql.SQL("select {0};").format)
def test_compose_badnargs_auto(self):
self.assertRaises(IndexError, sql.SQL("select {};").format)
self.assertRaises(ValueError, sql.SQL("select {} {1};").format, 10, 20)
self.assertRaises(ValueError, sql.SQL("select {0} {};").format, 10, 20)
def test_compose_bad_args_type(self):
self.assertRaises(IndexError, sql.SQL("select {0};").format, a=10)
self.assertRaises(KeyError, sql.SQL("select {x};").format, 10)
def test_no_modifiers(self):
self.assertRaises(ValueError, sql.SQL("select {a!r};").format, a=10)
self.assertRaises(ValueError, sql.SQL("select {a:<};").format, a=10)
def test_must_be_adaptable(self):
class Foo(object):
pass
self.assertRaises(psycopg2.ProgrammingError,
sql.SQL("select {0};").format(sql.Literal(Foo())).as_string, self.conn)
def test_class(self):
self.assertTrue(issubclass(sql.Identifier, sql.Composable))
def test_string(self):
self.assertEqual(sql.Identifier('foo').string, 'foo')
def test_repr(self):
obj = sql.Identifier("fo'o")
self.assertEqual(repr(obj), 'Identifier("fo\'o")')
self.assertEqual(repr(obj), str(obj))
def test_eq(self):
self.assertTrue(sql.Identifier('foo') == sql.Identifier('foo'))
self.assertTrue(sql.Identifier('foo') != sql.Identifier('bar'))
self.assertTrue(sql.Identifier('foo') != 'foo')
self.assertTrue(sql.Identifier('foo') != sql.SQL('foo'))
def test_as_str(self):
self.assertEqual(sql.Identifier('foo').as_string(self.conn), '"foo"')
self.assertEqual(sql.Identifier("fo'o").as_string(self.conn), '"fo\'o"')
def test_join(self):
self.assertTrue(not hasattr(sql.Identifier('foo'), 'join'))