greenio_test.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:deb-python-eventlet 作者: openstack 项目源码 文件源码
def test_timeout_and_final_write(self):
        # This test verifies that a write on a socket that we've
        # stopped listening for doesn't result in an incorrect switch
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server.bind(('127.0.0.1', 0))
        server.listen(50)
        bound_port = server.getsockname()[1]

        def sender(evt):
            s2, addr = server.accept()
            wrap_wfile = s2.makefile('wb')

            eventlet.sleep(0.02)
            wrap_wfile.write(b'hi')
            s2.close()
            evt.send(b'sent via event')

        evt = event.Event()
        eventlet.spawn(sender, evt)
        # lets the socket enter accept mode, which
        # is necessary for connect to succeed on windows
        eventlet.sleep(0)
        try:
            # try and get some data off of this pipe
            # but bail before any is sent
            eventlet.Timeout(0.01)
            client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            client.connect(('127.0.0.1', bound_port))
            wrap_rfile = client.makefile()
            wrap_rfile.read(1)
            self.fail()
        except eventlet.Timeout:
            pass

        result = evt.wait()
        self.assertEqual(result, b'sent via event')
        server.close()
        client.close()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号