def select(conn, query: str, params=None, name=None, itersize=5000):
"""Return a select statement's results as a namedtuple.
Parameters
----------
conn : database connection
query : select query string
params : query parameters.
name : server side cursor name. defaults to client side.
itersize : number of records fetched by server.
"""
with conn.cursor(name, cursor_factory=NamedTupleCursor) as cursor:
cursor.itersize = itersize
cursor.execute(query, params)
for result in cursor:
yield result
python类NamedTupleCursor()的实例源码
def __fetch(self, fetch_all, sql=None, mf=MusicFilter(), cursor_factory=NamedTupleCursor):
if sql is None:
sql = '''select * from do_filter(%s::filter)'''
if mf is None:
mf = MusicFilter()
if cursor_factory is None:
cursor_factory = NamedTupleCursor
arg = [mf.tuple()]
with self.pg.cursor(cursor_factory=cursor_factory) as cursor:
debug('Query: {}'.format(cursor.mogrify(sql, arg)))
with benchmark("Fetching:"):
cursor.execute(sql, arg)
if fetch_all:
s = cursor.fetchall()
else:
s = cursor.fetchone()
# debug(s)
return s
def get_records(conn, qualified_name):
with conn:
with conn.cursor(cursor_factory=NamedTupleCursor) as cursor:
cursor.execute('select * from %s;' % qualified_name)
records = cursor.fetchall()
return records
def insert(conn, qualified_name: str, column_names, records):
"""Insert a collection of namedtuple records."""
query = create_insert_statement(qualified_name, column_names)
with conn:
with conn.cursor(cursor_factory=NamedTupleCursor) as cursor:
for record in records:
cursor.execute(query, record)
def assert_db_anonymized(db):
cursor = db.cursor(cursor_factory=NamedTupleCursor)
cursor.execute('SELECT * FROM customer;')
customers = cursor.fetchall()
assert_customer_anonymized(customers[0], 'name_1', 'fr', 'LAT', '111.111.111.111')
assert_customer_anonymized(customers[1], 'name_2', 'tlh', 'KR', '111.111.111.111')
cursor.execute('SELECT * FROM customer_address;')
addresses = cursor.fetchall()
assert_address_anonymized(addresses[0], 1, '15', 'France')
assert_address_anonymized(addresses[1], 2, '6', 'Klingon Empire')
def __executefile(self, filepath):
if self.dry_run:
info("[DRY-RUN] Executing {}".format(filepath))
return
schema_path = os.path.join(os.path.dirname(sys.argv[0]), filepath)
with open(schema_path, "r") as s:
with self.pg.cursor(cursor_factory=NamedTupleCursor) as cursor:
cursor.execute(s.read())
def folders(self):
sql = '''select name from folders'''
with benchmark("DB Folders"):
with self.pg.cursor(cursor_factory=NamedTupleCursor) as folder_cursor:
folder_cursor.execute(sql, [])
return [f.name for f in folder_cursor.fetchall()]
def filter(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor):
return self.__fetch(True, None, mf, cursor_factory)
def __fetchone(self, sql=None, mf=MusicFilter(), cursor_factory=NamedTupleCursor):
return self.__fetch(False, sql, mf, cursor_factory)
def titles(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor, fast=False):
if fast:
sql = """select distinct title as name from musics"""
return self.__fetchfast(sql)
else:
if mf is None:
sql = """select coalesce(array_agg(distinct title), array[]::text[]) as titles from musics"""
else:
sql = """select coalesce(array_agg(distinct title), array[]::text[]) as titles from do_filter(%s::filter)"""
return self.__fetchone(sql, mf, cursor_factory).titles
def albums(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor, fast=False):
if fast:
sql = """select distinct name from albums"""
return self.__fetchfast(sql)
else:
if mf is None:
sql = """select coalesce(array_agg(distinct name), array[]::text[]) as albums from albums"""
else:
sql = """select coalesce(array_agg(distinct album), array[]::text[]) as albums from do_filter(%s::filter)"""
return self.__fetchone(sql, mf, cursor_factory).albums
def genres(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor, fast=False):
if fast:
sql = """select distinct name from genres"""
return self.__fetchfast(sql)
else:
if mf is None:
sql = """select coalesce(array_agg(distinct name), array[]::text[]) as genres from genres"""
else:
sql = """select coalesce(array_agg(distinct genre), array[]::text[]) as genres from do_filter(%s::filter)"""
return self.__fetchone(sql, mf, cursor_factory).genres
def keywords(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor, fast=False):
if fast:
sql = """select distinct name from tags"""
return self.__fetchfast(sql)
else:
if mf is None:
sql = """select coalesce(array_agg(distinct name), array[]::text[]) as keywords from tags"""
else:
sql = """select coalesce(array_agg(distinct keywords), array[]::text[]) as keywords from (select unnest(array_cat_agg(keywords)) as keywords from do_filter(%s::filter)) k"""
return self.__fetchone(sql, mf, cursor_factory).keywords
def artists(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor, fast=False):
if fast:
sql = """select distinct name from artists"""
return self.__fetchfast(sql)
else:
if mf is None:
sql = """select coalesce(array_agg(distinct name), array[]::text[]) as artists from artists"""
else:
sql = """select coalesce(array_agg(distinct artist), array[]::text[]) as artists from do_filter(%s::filter)"""
return self.__fetchone(sql, mf, cursor_factory).artists
def stats(self, mf=MusicFilter(), cursor_factory=NamedTupleCursor):
sql = 'select * from do_stats(%s::filter)'
return self.__fetchone(sql, mf, cursor_factory)
def __init__(self, dbname, username, password, host, port, cursor_type=None, verbose=False):
self.sshtunnel = None
self._psql_conn = None
self.verbose = verbose
# self.connection_string = (
# "dbname={} user={} "
# "password={} " if password is not None else "{}"
# "host={} " if host is not None else "{}"
# "port={} " if host is not None else "{}").format(
# dbname,
# username,
# password if password is not None else "",
# host if host is not None else "",
# port if port is not None else "")
self.dbname = dbname
self.username = username
self.password = password
self.host = host
self.port = int(port) if port != "" else 5432
# self.connection_string = "dbname={} ".format(dbname)
# self.connection_string += "user={} ".format(username)
# self.connection_string += "password={} ".format(password) if password is not None else ""
# self.connection_string += "host={} ".format(host) if host is not None else ""
# self.connection_string += "port={}".format(port) if host is not None else ""
#
# print self.connection_string
if cursor_type is None:
self.cursor_factory = NamedTupleCursor
elif cursor_type == "standard":
self.cursor_factory = None
elif cursor_type == "namedtuple":
self.cursor_factory = NamedTupleCursor
elif cursor_type == "dictlike":
self.cursor_factory = DictCursor
elif cursor_type == "dict":
self.cursor_factory = RealDictCursor
else:
raise Exception("'cursor_type' unknonw: '{}'".format(cursor_type))
# self.conn = pgres.connect(connection_string, cursor_factory=NamedTupleCursor)
# self._conn = pgres.connect(connection_string)
self.connect(self.cursor_factory)