def test_concurrency(self):
evt = eventlet.Event()
def waiter(sock, addr):
sock.sendall(b'hi')
evt.wait()
l = eventlet.listen(('localhost', 0))
eventlet.spawn(eventlet.serve, l, waiter, 5)
def test_client():
c = eventlet.connect(('localhost', l.getsockname()[1]))
# verify the client is connected by getting data
self.assertEqual(b'hi', c.recv(2))
return c
[test_client() for i in range(5)]
# very next client should not get anything
x = eventlet.with_timeout(
0.01,
test_client,
timeout_value="timed out")
self.assertEqual(x, "timed out")
评论列表
文章目录