test_async.py 文件源码

python
阅读 37 收藏 0 点赞 0 评论 0

项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码
def test_flush_on_write(self):
        # a very large query requires a flush loop to be sent to the backend
        curs = self.conn.cursor()
        for mb in 1, 5, 10, 20, 50:
            size = mb * 1024 * 1024
            stub = PollableStub(self.conn)
            curs.execute("select %s;", ('x' * size,))
            self.wait(stub)
            self.assertEqual(size, len(curs.fetchone()[0]))
            if stub.polls.count(ext.POLL_WRITE) > 1:
                return

        # This is more a testing glitch than an error: it happens
        # on high load on linux: probably because the kernel has more
        # buffers ready. A warning may be useful during development,
        # but an error is bad during regression testing.
        import warnings
        warnings.warn("sending a large query didn't trigger block on write.")
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号