def _downstream_thread(self, stream, downstream_boundary):
data_buff = io.BytesIO()
multipart_parser = MultipartParser(data_buff, downstream_boundary)
while not self._stop_threads_event.is_set():
if stream.data:
current_buffer_pos = data_buff.tell()
data_buff.seek(0, io.SEEK_END)
data_buff.write(b''.join(stream.data))
data_buff.seek(current_buffer_pos)
stream.data = []
message_part = multipart_parser.get_next_part()
if not message_part:
time.sleep(0.5)
continue
self._process_message_parts([message_part])
评论列表
文章目录