def multipart_upload_worker(scheme, parsed_url, storage_uri, bucket_name, multipart_id,
filename, offset, bytes, num_retries, queue):
"""
Worker method for uploading a file chunk to S3 using multipart upload.
Note that the file chunk is read into memory, so it's important to keep
this number reasonably small.
"""
def _upload_callback(uploaded, total):
worker_name = multiprocessing.current_process().name
log.Debug("%s: Uploaded %s/%s bytes" % (worker_name, uploaded, total))
if queue is not None:
queue.put([uploaded, total]) # Push data to the consumer thread
def _upload(num_retries):
worker_name = multiprocessing.current_process().name
log.Debug("%s: Uploading chunk %d" % (worker_name, offset + 1))
try:
conn = get_connection(scheme, parsed_url, storage_uri)
bucket = conn.lookup(bucket_name)
for mp in bucket.list_multipart_uploads():
if mp.id == multipart_id:
with FileChunkIO(filename, 'r', offset=offset * bytes, bytes=bytes) as fd:
start = time.time()
mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback,
num_cb=max(2, 8 * bytes / (1024 * 1024))
) # Max num of callbacks = 8 times x megabyte
end = time.time()
log.Debug(("{name}: Uploaded chunk {chunk}"
"at roughly {speed} bytes/second").format(name=worker_name,
chunk=offset + 1,
speed=(bytes / max(1, abs(end - start)))))
break
conn.close()
conn = None
bucket = None
del conn
except Exception as e:
traceback.print_exc()
if num_retries:
log.Debug("%s: Upload of chunk %d failed. Retrying %d more times..." % (
worker_name, offset + 1, num_retries - 1))
return _upload(num_retries - 1)
log.Debug("%s: Upload of chunk %d failed. Aborting..." % (
worker_name, offset + 1))
raise e
log.Debug("%s: Upload of chunk %d complete" % (worker_name, offset + 1))
return _upload(num_retries)
评论列表
文章目录