greenio_test.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:deb-python-eventlet 作者: openstack 项目源码 文件源码
def test_send_timeout(self):
        self.reset_timeout(2)
        listener = bufsized(eventlet.listen(('', 0)))

        evt = event.Event()

        def server():
            # accept the connection in another greenlet
            sock, addr = listener.accept()
            sock = bufsized(sock)
            evt.wait()

        gt = eventlet.spawn(server)

        addr = listener.getsockname()

        client = bufsized(greenio.GreenSocket(socket.socket()))
        client.connect(addr)

        client.settimeout(0.00001)
        msg = b"A" * 100000  # large enough number to overwhelm most buffers

        # want to exceed the size of the OS buffer so it'll block in a
        # single send
        def send():
            for x in range(10):
                client.send(msg)

        expect_socket_timeout(send)

        evt.send()
        gt.wait()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号