def __init__(self, items=None):
r"""Creates a new SQLQuery.
>>> SQLQuery("x")
<sql: 'x'>
>>> q = SQLQuery(['SELECT * FROM ', 'test', ' WHERE x=', SQLParam(1)])
>>> q
<sql: 'SELECT * FROM test WHERE x=1'>
>>> q.query(), q.values()
('SELECT * FROM test WHERE x=%s', [1])
>>> SQLQuery(SQLParam(1))
<sql: '1'>
"""
if items is None:
self.items = []
elif isinstance(items, list):
self.items = items
elif isinstance(items, SQLParam):
self.items = [items]
elif isinstance(items, SQLQuery):
self.items = list(items.items)
else:
self.items = [items]
# Take care of SQLLiterals
for i, item in enumerate(self.items):
if isinstance(item, SQLParam) and isinstance(item.value, SQLLiteral):
self.items[i] = item.value.v
python类query()的实例源码
def __len__(self):
return len(self.query())
def values(self):
"""
Returns the values of the parameters used in the sql query.
>>> q = SQLQuery(["SELECT * FROM test WHERE name=", SQLParam('joe')])
>>> q.values()
['joe']
"""
return [i.value for i in self.items if isinstance(i, SQLParam)]
def _str(self):
try:
return self.query() % tuple([sqlify(x) for x in self.values()])
except (ValueError, TypeError):
return self.query()
def sqlquote(a):
"""
Ensures `a` is quoted properly for use in a SQL query.
>>> 'WHERE x = ' + sqlquote(True) + ' AND y = ' + sqlquote(3)
<sql: "WHERE x = 't' AND y = 3">
>>> 'WHERE x = ' + sqlquote(True) + ' AND y IN ' + sqlquote([2, 3])
<sql: "WHERE x = 't' AND y IN (2, 3)">
"""
if isinstance(a, list):
return _sqllist(a)
else:
return sqlparam(a).sqlquery()
def _process_query(self, sql_query):
"""Takes the SQLQuery object and returns query string and parameters.
"""
paramstyle = getattr(self, 'paramstyle', 'pyformat')
query = sql_query.query(paramstyle)
params = sql_query.values()
return query, params
def query(self, sql_query, vars=None, processed=False, _test=False):
"""
Execute SQL query `sql_query` using dictionary `vars` to interpolate it.
If `processed=True`, `vars` is a `reparam`-style list to use
instead of interpolating.
>>> db = DB(None, {})
>>> db.query("SELECT * FROM foo", _test=True)
<sql: 'SELECT * FROM foo'>
>>> db.query("SELECT * FROM foo WHERE x = $x", vars=dict(x='f'), _test=True)
<sql: "SELECT * FROM foo WHERE x = 'f'">
>>> db.query("SELECT * FROM foo WHERE x = " + sqlquote('f'), _test=True)
<sql: "SELECT * FROM foo WHERE x = 'f'">
"""
if vars is None: vars = {}
if not processed and not isinstance(sql_query, SQLQuery):
sql_query = reparam(sql_query, vars)
if _test: return sql_query
db_cursor = self._db_cursor()
self._db_execute(db_cursor, sql_query)
if db_cursor.description:
names = [x[0] for x in db_cursor.description]
def iterwrapper():
row = db_cursor.fetchone()
while row:
yield storage(dict(zip(names, row)))
row = db_cursor.fetchone()
out = iterbetter(iterwrapper())
out.__len__ = lambda: int(db_cursor.rowcount)
out.list = lambda: [storage(dict(zip(names, x))) \
for x in db_cursor.fetchall()]
else:
out = db_cursor.rowcount
if not self.ctx.transactions:
self.ctx.commit()
return out
def update(self, tables, where, vars=None, _test=False, **values):
"""
Update `tables` with clause `where` (interpolated using `vars`)
and setting `values`.
>>> db = DB(None, {})
>>> name = 'Joseph'
>>> q = db.update('foo', where='name = $name', name='bob', age=2,
... created=SQLLiteral('NOW()'), vars=locals(), _test=True)
>>> q
<sql: "UPDATE foo SET age = 2, created = NOW(), name = 'bob' WHERE name = 'Joseph'">
>>> q.query()
'UPDATE foo SET age = %s, created = NOW(), name = %s WHERE name = %s'
>>> q.values()
[2, 'bob', 'Joseph']
"""
if vars is None: vars = {}
where = self._where(where, vars)
values = sorted(values.items(), key=lambda t: t[0])
query = (
"UPDATE " + sqllist(tables) +
" SET " + sqlwhere(values, ', ') +
" WHERE " + where)
if _test: return query
db_cursor = self._db_cursor()
self._db_execute(db_cursor, query)
if not self.ctx.transactions:
self.ctx.commit()
return db_cursor.rowcount
def _process_insert_query(self, query, tablename, seqname):
return query
def _process_insert_query(self, query, tablename, seqname):
if seqname is None:
# when seqname is not provided guess the seqname and make sure it exists
seqname = tablename + "_id_seq"
if seqname not in self._get_all_sequences():
seqname = None
if seqname:
query += "; SELECT currval('%s')" % seqname
return query
def _get_all_sequences(self):
"""Query postgres to find names of all sequences used in this database."""
if self._sequences is None:
q = "SELECT c.relname FROM pg_class c WHERE c.relkind = 'S'"
self._sequences = set([c.relname for c in self.query(q)])
return self._sequences
def _process_insert_query(self, query, tablename, seqname):
return query, SQLQuery('SELECT last_insert_rowid();')
def query(self, *a, **kw):
out = DB.query(self, *a, **kw)
if isinstance(out, iterbetter):
del out.__len__
return out
def _process_query(self, sql_query):
"""Takes the SQLQuery object and returns query string and parameters.
"""
# MSSQLDB expects params to be a tuple.
# Overwriting the default implementation to convert params to tuple.
paramstyle = getattr(self, 'paramstyle', 'pyformat')
query = sql_query.query(paramstyle)
params = sql_query.values()
return query, tuple(params)
def _process_insert_query(self, query, tablename, seqname):
if seqname is None:
# It is not possible to get seq name from table name in Oracle
return query
else:
return query + "; SELECT %s.currval FROM dual" % seqname
def __init__(self, items=None):
r"""Creates a new SQLQuery.
>>> SQLQuery("x")
<sql: 'x'>
>>> q = SQLQuery(['SELECT * FROM ', 'test', ' WHERE x=', SQLParam(1)])
>>> q
<sql: 'SELECT * FROM test WHERE x=1'>
>>> q.query(), q.values()
('SELECT * FROM test WHERE x=%s', [1])
>>> SQLQuery(SQLParam(1))
<sql: '1'>
"""
if items is None:
self.items = []
elif isinstance(items, list):
self.items = items
elif isinstance(items, SQLParam):
self.items = [items]
elif isinstance(items, SQLQuery):
self.items = list(items.items)
else:
self.items = [items]
# Take care of SQLLiterals
for i, item in enumerate(self.items):
if isinstance(item, SQLParam) and isinstance(item.value, SQLLiteral):
self.items[i] = item.value.v
def __len__(self):
return len(self.query())
def values(self):
"""
Returns the values of the parameters used in the sql query.
>>> q = SQLQuery(["SELECT * FROM test WHERE name=", SQLParam('joe')])
>>> q.values()
['joe']
"""
return [i.value for i in self.items if isinstance(i, SQLParam)]
def _str(self):
try:
return self.query() % tuple([sqlify(x) for x in self.values()])
except (ValueError, TypeError):
return self.query()
def sqlquote(a):
"""
Ensures `a` is quoted properly for use in a SQL query.
>>> 'WHERE x = ' + sqlquote(True) + ' AND y = ' + sqlquote(3)
<sql: "WHERE x = 't' AND y = 3">
>>> 'WHERE x = ' + sqlquote(True) + ' AND y IN ' + sqlquote([2, 3])
<sql: "WHERE x = 't' AND y IN (2, 3)">
"""
if isinstance(a, list):
return _sqllist(a)
else:
return sqlparam(a).sqlquery()