def main_loop(self, concurrency=4, worker_class=UploadWorker):
chunk_index = 0
self._begin_upload()
self._workers = self._start_workers(concurrency, worker_class=worker_class)
while self._pending_chunks or not self.stream_handler.finished:
self._check_workers() # raise exception and stop everything if any worker has crashed
# print "main_loop p:{} o:{} i:{}".format(
# self._pending_chunks, self.outbox.qsize(), self.inbox.qsize())
# consume results first as this is a quick operation
self._handle_results()
chunk = self.stream_handler.get_chunk()
if chunk:
# s3 multipart index is 1 based, increment before sending
chunk_index += 1
self._send_chunk(chunk_index, chunk)
self._finish_upload()
self.results.sort()
return multipart_etag(r[1] for r in self.results)
评论列表
文章目录