test_cancel.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:flask 作者: bobohope 项目源码 文件源码
def test_async_cancel(self):
        async_conn = psycopg2.connect(dsn, async_=True)
        self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
        extras.wait_select(async_conn)
        cur = async_conn.cursor()
        cur.execute("select pg_sleep(10)")
        time.sleep(1)
        self.assertTrue(async_conn.isexecuting())
        async_conn.cancel()
        self.assertRaises(psycopg2.extensions.QueryCanceledError,
                          extras.wait_select, async_conn)
        cur.execute("select 1")
        extras.wait_select(async_conn)
        self.assertEqual(cur.fetchall(), [(1, )])
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号