def test_close_when_done(self):
s, event = start_echo_server()
s.start_resend_event = threading.Event()
c = echo_client('\n', s.port)
c.push("hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
c.close_when_done()
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
# Only allow the server to start echoing data back to the client after
# the client has closed its connection. This prevents a race condition
# where the server echoes all of its data before we can check that it
# got any down below.
s.start_resend_event.set()
s.join()
self.assertEqual(c.contents, [])
# the server might have been able to send a byte or two back, but this
# at least checks that it received something and didn't just fail
# (which could still result in the client not having received anything)
self.assertTrue(len(s.buffer) > 0)
评论列表
文章目录