db.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:djaio 作者: Sberned 项目源码 文件源码
def _execute(self, query: str, values: Union[List, Dict], db_name: str = 'default',
                       returning: bool = False):
        pool = self.dbs[db_name]['master']
        if pool is None:
            raise RuntimeError('db {} master is not initialized'.format(db_name))

        async with pool.acquire() as conn:
            async with conn.cursor(cursor_factory=DictCursor) as cursor:
                await cursor.execute(query, values)
                if returning:
                    return await cursor.fetchone()
                else:
                    return cursor.rowcount
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号