def test_streaming_body(self):
self.prepared = Future()
self.data = Future()
self.finished = Future()
stream = self.connect(b"/stream_body", connection_close=True)
yield self.prepared
stream.write(b"4\r\nasdf\r\n")
# Ensure the first chunk is received before we send the second.
data = yield self.data
self.assertEqual(data, b"asdf")
self.data = Future()
stream.write(b"4\r\nqwer\r\n")
data = yield self.data
self.assertEquals(data, b"qwer")
stream.write(b"0\r\n")
yield self.finished
data = yield gen.Task(stream.read_until_close)
# This would ideally use an HTTP1Connection to read the response.
self.assertTrue(data.endswith(b"{}"))
stream.close()
评论列表
文章目录