test_blobxfer_operations_download.py 文件源码

python
阅读 28 收藏 0 点赞 0 评论 0

项目:blobxfer 作者: Azure 项目源码 文件源码
def _create_downloader_for_start(td):
    d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
    d._cleanup_temporary_files = mock.MagicMock()
    d._download_start = datetime.datetime.now(tz=dateutil.tz.tzlocal())
    d._initialize_transfer_threads = mock.MagicMock()
    d._general_options.concurrency.crypto_processes = 1
    d._general_options.concurrency.md5_processes = 1
    d._general_options.concurrency.disk_threads = 1
    d._general_options.concurrency.transfer_threads = 1
    d._general_options.resume_file = pathlib.Path(str(td.join('rf')))
    d._spec.sources = []
    d._spec.options = mock.MagicMock()
    d._spec.options.chunk_size_bytes = 1
    d._spec.options.mode = azmodels.StorageModes.Auto
    d._spec.options.overwrite = True
    d._spec.options.rename = False
    d._spec.skip_on = mock.MagicMock()
    d._spec.skip_on.md5_match = False
    d._spec.skip_on.lmt_ge = False
    d._spec.skip_on.filesize_match = False
    d._spec.destination = mock.MagicMock()
    d._spec.destination.path = pathlib.Path(str(td))
    d._download_start_time = util.datetime_now()
    d._pre_md5_skip_on_check = mock.MagicMock()
    d._check_download_conditions = mock.MagicMock()
    d._all_remote_files_processed = False

    p = '/cont/remote/path'
    asp = azops.SourcePath()
    asp.add_path_with_storage_account(p, 'sa')
    d._spec.sources.append(asp)

    return d
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号