def line_terminator_check(self, term, server_chunk):
event = threading.Event()
s = echo_server(event)
s.chunk_size = server_chunk
s.start()
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
c = echo_client(term, s.port)
c.push(b"hello ")
c.push(b"world" + term)
c.push(b"I'm not dead yet!" + term)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join(timeout=TIMEOUT)
if s.is_alive():
self.fail("join() timed out")
self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])
# the line terminator tests below check receiving variously-sized
# chunks back from the server in order to exercise all branches of
# async_chat.handle_read
评论列表
文章目录