def line_terminator_check(self, term, server_chunk):
event = threading.Event()
s = echo_server(event)
s.chunk_size = server_chunk
s.start()
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
c = echo_client(term, s.port)
c.push("hello ")
c.push("world%s" % term)
c.push("I'm not dead yet!%s" % term)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
# the line terminator tests below check receiving variously-sized
# chunks back from the server in order to exercise all branches of
# async_chat.handle_read
评论列表
文章目录