db.py 文件源码

python
阅读 29 收藏 0 点赞 0 评论 0

项目:djaio 作者: Sberned 项目源码 文件源码
def _select(self, query: str, values: Union[List, Dict], db_name: str = 'default'):
        dbs = self.dbs[db_name]
        pool = dbs.get('slave') or dbs.get('master')
        if pool is None:
            raise RuntimeError('db {} master is not initialized'.format(db_name))

        async with pool.acquire() as conn:
            async with conn.cursor(cursor_factory=DictCursor) as cursor:
                await cursor.execute(query, values)
                return await cursor.fetchall()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号