test_socket.py 文件源码

python
阅读 29 收藏 0 点赞 0 评论 0

项目:web_ctp 作者: molebot 项目源码 文件源码
def _testCongestion(self):
        # test the behavior in case of congestion
        self.data = b'fill'
        self.cli.setblocking(False)
        try:
            # try to lower the receiver's socket buffer size
            self.cli.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16384)
        except OSError:
            pass
        with self.assertRaises(OSError) as cm:
            try:
                # fill the receiver's socket buffer
                while True:
                    self.cli.sendto(self.data, 0, (HOST, self.port))
            finally:
                # signal the receiver we're done
                self.evt.set()
        # sendto() should have failed with ENOBUFS
        self.assertEqual(cm.exception.errno, errno.ENOBUFS)
        # and we should have received a congestion notification through poll
        r, w, x = select.select([self.serv], [], [], 3.0)
        self.assertIn(self.serv, r)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号