convenience_test.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:deb-python-eventlet 作者: openstack 项目源码 文件源码
def test_concurrency(self):
        evt = eventlet.Event()

        def waiter(sock, addr):
            sock.sendall(b'hi')
            evt.wait()
        l = eventlet.listen(('localhost', 0))
        eventlet.spawn(eventlet.serve, l, waiter, 5)

        def test_client():
            c = eventlet.connect(('localhost', l.getsockname()[1]))
            # verify the client is connected by getting data
            self.assertEqual(b'hi', c.recv(2))
            return c
        [test_client() for i in range(5)]
        # very next client should not get anything
        x = eventlet.with_timeout(
            0.01,
            test_client,
            timeout_value="timed out")
        self.assertEqual(x, "timed out")
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号