_boto_multi.py 文件源码

python
阅读 14 收藏 0 点赞 0 评论 0

项目:alicloud-duplicity 作者: aliyun 项目源码 文件源码
def multipart_upload_worker(scheme, parsed_url, storage_uri, bucket_name, multipart_id,
                            filename, offset, bytes, num_retries, queue):
    """
    Worker method for uploading a file chunk to S3 using multipart upload.
    Note that the file chunk is read into memory, so it's important to keep
    this number reasonably small.
    """

    def _upload_callback(uploaded, total):
        worker_name = multiprocessing.current_process().name
        log.Debug("%s: Uploaded %s/%s bytes" % (worker_name, uploaded, total))
        if queue is not None:
            queue.put([uploaded, total])  # Push data to the consumer thread

    def _upload(num_retries):
        worker_name = multiprocessing.current_process().name
        log.Debug("%s: Uploading chunk %d" % (worker_name, offset + 1))
        try:
            conn = get_connection(scheme, parsed_url, storage_uri)
            bucket = conn.lookup(bucket_name)

            for mp in bucket.list_multipart_uploads():
                if mp.id == multipart_id:
                    with FileChunkIO(filename, 'r', offset=offset * bytes, bytes=bytes) as fd:
                        start = time.time()
                        mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback,
                                                 num_cb=max(2, 8 * bytes / (1024 * 1024))
                                                 )  # Max num of callbacks = 8 times x megabyte
                        end = time.time()
                        log.Debug(("{name}: Uploaded chunk {chunk}"
                                  "at roughly {speed} bytes/second").format(name=worker_name,
                                                                            chunk=offset + 1,
                                                                            speed=(bytes / max(1, abs(end - start)))))
                    break
            conn.close()
            conn = None
            bucket = None
            del conn
        except Exception as e:
            traceback.print_exc()
            if num_retries:
                log.Debug("%s: Upload of chunk %d failed. Retrying %d more times..." % (
                    worker_name, offset + 1, num_retries - 1))
                return _upload(num_retries - 1)
            log.Debug("%s: Upload of chunk %d failed. Aborting..." % (
                worker_name, offset + 1))
            raise e
        log.Debug("%s: Upload of chunk %d complete" % (worker_name, offset + 1))

    return _upload(num_retries)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号