def execute(self, db_name: str, query: str, values: List, _type: str):
"""
Execute SQL query in connection pool
"""
warnings.warn("Use single methods!", DeprecationWarning)
if _type not in ('select', 'insert', 'update', 'delete'):
raise RuntimeError(
'Wrong request type {}'.format(_type)
)
if not self.dbs[db_name]['master']:
raise RuntimeError(
'db {} master is not initialized'.format(db_name)
)
pool = self.dbs[db_name]['master']
if _type == 'select' and 'slave' in self.dbs[db_name]:
pool = self.dbs[db_name]['slave']
async with pool.acquire() as conn:
async with conn.cursor(cursor_factory=DictCursor) as cursor:
await cursor.execute(query, values)
if _type == 'select':
data = await cursor.fetchall()
else:
data = cursor.rowcount
return data
评论列表
文章目录