greenio_test.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:deb-python-eventlet 作者: openstack 项目源码 文件源码
def test_closure(self):
        def spam_to_me(address):
            sock = eventlet.connect(address)
            while True:
                try:
                    sock.sendall(b'hello world')
                    # Arbitrary delay to not use all available CPU, keeps the test
                    # running quickly and reliably under a second
                    time.sleep(0.001)
                except socket.error as e:
                    if get_errno(e) == errno.EPIPE:
                        return
                    raise

        server = eventlet.listen(('127.0.0.1', 0))
        sender = eventlet.spawn(spam_to_me, server.getsockname())
        client, address = server.accept()
        server.close()

        def reader():
            try:
                while True:
                    data = client.recv(1024)
                    assert data
                    # Arbitrary delay to not use all available CPU, keeps the test
                    # running quickly and reliably under a second
                    time.sleep(0.001)
            except socket.error as e:
                # we get an EBADF because client is closed in the same process
                # (but a different greenthread)
                if get_errno(e) != errno.EBADF:
                    raise

        def closer():
            client.close()

        reader = eventlet.spawn(reader)
        eventlet.spawn_n(closer)
        reader.wait()
        sender.wait()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号