def test_connect_tcp(self):
def accept_once(listenfd):
try:
conn, addr = listenfd.accept()
fd = conn.makefile(mode='wb')
conn.close()
fd.write(b'hello\n')
fd.close()
finally:
listenfd.close()
server = eventlet.listen(('0.0.0.0', 0))
eventlet.spawn_n(accept_once, server)
client = eventlet.connect(('127.0.0.1', server.getsockname()[1]))
fd = client.makefile('rb')
client.close()
assert fd.readline() == b'hello\n'
assert fd.read() == b''
fd.close()
check_hub()
评论列表
文章目录