def select_dict(conn, query: str, params=None, name=None, itersize=5000):
"""Return a select statement's results as dictionary.
Parameters
----------
conn : database connection
query : select query string
params : query parameters.
name : server side cursor name. defaults to client side.
itersize : number of records fetched by server.
"""
with conn.cursor(name, cursor_factory=RealDictCursor) as cursor:
cursor.itersize = itersize
cursor.execute(query, params)
for result in cursor:
yield result
python类RealDictCursor()的实例源码
def get_task_status(self, task_id):
""""""
# type: (str)->Union[str,Dict[str,Any]]
sql_update = "SELECT task_status, description updated_at FROM {} WHERE task_id=%s".format(self.table_task_status)
cur = self.db_connection.cursor(cursor_factory=RealDictCursor) # type: cursor
try:
cur.execute(sql_update, (task_id, ))
tasK_status_record = cur.fetchone()
except:
traceback_message = traceback.format_exc()
return traceback_message
else:
cur.close()
return tasK_status_record
def get_music(artist, album, title):
'''Get a track tags or download it'''
page_format = request.args.get('format', 'html')
artist = unquote(artist)
album = unquote(album)
title = unquote(title)
collection = app.config['COLLECTION']
mf = MusicFilter(artists=[artist], albums=[album], titles=[title])
musics = webfilter(partial(collection.filter, cursor_factory=RealDictCursor), mf)
if len(musics) != 1:
return ('Music not found', 404)
music = musics[0]
if page_format == 'html':
return render_template("music.html", music=music)
elif page_format == 'json':
return dumps(music, sort_keys=True, indent=4, separators=(',', ': '))
return ('Invalid format, available: json,html', 400)
def remove_redundant_delays(conn, delays):
if len(delays.values()) == 0:
return delays # no new delays to add
ids = delays.keys()
cur = conn.cursor(cursor_factory=RealDictCursor)
query = """ SELECT * FROM realtime_updates WHERE id IN %(ids)s """
cur.execute(query, {'ids': tuple(ids)})
results = cur.fetchall()
cur.close()
existing_delays = {get_delay_id(r): r for r in results}
new_delays = {}
for key, delay in delays.iteritems():
# if stop already registered in datbase
if key in existing_delays:
current = existing_delays[key]
# and the file we're reading now is newer than the file in the database
if current['s3_path'] < delay['s3_path']:
# and the timings is different
if current['arrival_delay'] != delay['arrival_delay'] \
or current['departure_delay'] != delay['departure_delay']:
# then store the change
new_delays[key] = delay
else:
new_delays[key] = delay
return new_delays
def get_user(user_name: str):
"""Returns a user entry from PostgreSQL table.
Returns
-------
dict_user : dict
A dictionary with user information
"""
with psycopg2.connect(cursor_factory=RealDictCursor,
**PSQL_CONN) as conn:
dict_cur = conn.cursor()
dict_cur.execute(
"""SELECT * FROM {} WHERE user_name = '{}';"""
.format(PSQL_TABLE, user_name))
dict_user = dict_cur.fetchone()
conn.close()
return dict_user
def execute(conn, query, query_vars=()):
"""Execute given query and return fetched results"""
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, query_vars)
return cursor.fetchall()
def execute(conn, query):
"""Execute given query and return fetched results"""
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query)
return cursor.fetchall()
def connect_and_execute(query, database='postgres'):
"""Connect to database, execute given query and return fetched results"""
conn = connect(database=database)
try:
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor(cursor_factory=RealDictCursor)
cursor.execute(query)
return cursor.fetchall()
finally:
conn.close()
def get_processed_documents(self, task_id):
""""""
# type: (str)->Dict[str,Any]
sql_update = "SELECT text_id,sentence_index,task_id,knp_result,status FROM {} WHERE task_id=%s".format(self.table_knp_result)
cur = self.db_connection.cursor(cursor_factory=RealDictCursor) # type: cursor
try:
cur.execute(sql_update, (task_id, ))
task_status_record = cur.fetchall()
except:
traceback_message = traceback.format_exc()
return traceback_message
else:
cur.close()
return task_status_record
def get_stats():
'''Music library statistics'''
page_format = request.args.get('format', 'html')
collection = app.config['COLLECTION']
stats = webfilter(partial(collection.stats, cursor_factory=RealDictCursor))
def bytesToHuman(b):
return humanfriendly.format_size(b)
def secondsToHuman(s):
import datetime
return str(datetime.timedelta(seconds=s))
if page_format == 'html':
return render_template("stats.html", stats=stats, bytesToHuman=bytesToHuman, secondsToHuman=secondsToHuman)
elif page_format == 'json':
return dumps(stats)
return ('Invalid format, available: json,html', 400)
def test_stats(self):
stats = self.collection.stats(cursor_factory=RealDictCursor)
self.assertEqual(stats, teststats)
def test_filtered_stats(self):
mf = lib.MusicFilter()
mf.keywords = ['rock']
stats = self.collection.stats(mf, cursor_factory=RealDictCursor)
self.assertEqual(stats, filtered_teststats)
def get_next(self):
'''Gets the next pending device.
Returns:
Dict: The next pending device as a dictionary object
with the names of the rows as keys.
'''
proc = 'main_db.get_next'
# User a special cursor which returns results as dicts
with self.conn, self.conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute('''
SELECT * FROM
pending
WHERE
working= FALSE
ORDER BY pending_id ASC LIMIT 1
''')
output = cur.fetchone()
# Mark the new entry as being worked on
if output:
cur.execute('''
UPDATE pending
SET working= TRUE
WHERE pending_id= %s
''',
(output['pending_id'],))
# Return the next device
output = dict(output)
return output
else: return None
def get_dir_db_connection(*, dict_cursor: bool = False):
try:
connection = psycopg2.connect(
dbname=DIR_DB_NAME, user=DIR_DB_USER, password=DIR_DB_PASSWORD,
host=DIR_DB_HOST, port=DIR_DB_PORT)
except psycopg2.OperationalError as e:
logging.error('Unable to connect to Directory DB!\n%s', e)
raise
else:
logging.debug('Connected to Directory DB: %s!', DIR_DB_NAME)
if dict_cursor:
cursor = connection.cursor(cursor_factory=RealDictCursor)
else:
cursor = connection.cursor()
return connection, cursor
def get_sso_db_connection(*, dict_cursor: bool = False):
try:
connection = psycopg2.connect(
dbname=SSO_DB_NAME, user=SSO_DB_USER, password=SSO_DB_PASSWORD,
host=SSO_DB_HOST, port=SSO_DB_PORT)
except psycopg2.OperationalError as e:
logging.error('Unable to connect to SSO DB!\n%s', e)
raise
else:
logging.debug('Connected to Directory DB: %s!', DIR_DB_NAME)
if dict_cursor:
cursor = connection.cursor(cursor_factory=RealDictCursor)
else:
cursor = connection.cursor()
return connection, cursor
def get_sso_db_connection(*, dict_cursor: bool = False):
try:
connection = psycopg2.connect(
dbname=SSO_DB_NAME, user=SSO_DB_USER, password=SSO_DB_PASSWORD,
host=SSO_DB_HOST, port=SSO_DB_PORT)
except psycopg2.OperationalError as e:
logging.error('Unable to connect to SSO DB!\n%s', e)
raise
if dict_cursor:
cursor = connection.cursor(cursor_factory=RealDictCursor)
else:
cursor = connection.cursor()
return connection, cursor
def query_cursor(self, q, lazy_fetch=False, commit=True):
"""Execute a query and yield a cursor.
All execution performed by the Postgres object uses this method.
Args:
q (str): SQL query
lazy_fetch (bool): whether to use a server-side cursor (lazily fetches results).
"""
self.cursors_opened += 1
if self.verbose:
logging.debug(q)
if self.debug:
empty_cursor = Bunch()
empty_cursor.fetchmany = lambda size: []
empty_cursor.fetchall = lambda: []
yield empty_cursor
return
cursor_name = 'server_side_{}'.format(self.cursors_opened) if lazy_fetch else None
with self.connection.cursor(cursor_name, cursor_factory=RealDictCursor) as cursor:
cursor.execute(q)
yield cursor
if commit:
self.commit()
def query_cursor(self, q, lazy_fetch=False, commit=True):
"""Execute a query and yield a cursor.
All execution performed by the Postgres object uses this method.
Args:
q (str): SQL query
lazy_fetch (bool): whether to use a server-side cursor (lazily fetches results).
"""
self.cursors_opened += 1
if self.verbose:
logging.debug(q)
if self.debug:
empty_cursor = Bunch()
empty_cursor.fetchmany = lambda size: []
empty_cursor.fetchall = lambda: []
yield empty_cursor
return
cursor_name = 'server_side_{}'.format(self.cursors_opened) if lazy_fetch else None
with self.connection.cursor(cursor_name, cursor_factory=RealDictCursor) as cursor:
cursor.execute(q)
yield cursor
if commit:
self.commit()
def zupc_show_temp():
cur = current_app.extensions['sqlalchemy'].db.session.connection().\
connection.cursor(cursor_factory=RealDictCursor)
cur.execute("""SELECT id, nom, insee FROM zupc_temp
WHERE multiple=true AND parent_id = id;""")
return render_template("zupc_show_temp.html",
list_zupc=cur.fetchall(),
apikey=current_user.apikey,
mapbox_token=current_app.config['MAPBOX_TOKEN'])
def execute_to_json(conn, query, params=None):
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute(query, params)
results = []
for row in cursor.fetchall():
row = StringConverter().snake_case_to_camel_case(row)
results.append(dict(zip(row.keys(), row.values())))
return results
def read_pg(sql, conn=None, **kwargs):
''' Read a SQL query and return it as a Table '''
cur = conn.cursor(cursor_factory=extras.RealDictCursor)
cur.execute(sql)
# Error occurs if a function is used in SQL query
# and column name is not explictly provided
new_table = Table(name='SQL Query', dialect='postgres')
new_table.add_dicts(list(cur))
return new_table