def test_timeout_and_final_write(self):
# This test verifies that a write on a socket that we've
# stopped listening for doesn't result in an incorrect switch
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 0))
server.listen(50)
bound_port = server.getsockname()[1]
def sender(evt):
s2, addr = server.accept()
wrap_wfile = s2.makefile('wb')
eventlet.sleep(0.02)
wrap_wfile.write(b'hi')
s2.close()
evt.send(b'sent via event')
evt = event.Event()
eventlet.spawn(sender, evt)
# lets the socket enter accept mode, which
# is necessary for connect to succeed on windows
eventlet.sleep(0)
try:
# try and get some data off of this pipe
# but bail before any is sent
eventlet.Timeout(0.01)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', bound_port))
wrap_rfile = client.makefile()
wrap_rfile.read(1)
self.fail()
except eventlet.Timeout:
pass
result = evt.wait()
self.assertEqual(result, b'sent via event')
server.close()
client.close()
评论列表
文章目录