def test_worker_thread_disk():
with mock.patch(
'blobxfer.operations.download.Downloader.termination_check',
new_callable=mock.PropertyMock) as patched_tc:
d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._general_options.concurrency.disk_threads = 1
d._disk_queue = mock.MagicMock()
d._disk_queue.get.side_effect = [
(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()),
]
d._process_data = mock.MagicMock()
patched_tc.side_effect = [False, True]
d._worker_thread_disk()
assert d._process_data.call_count == 1
with mock.patch(
'blobxfer.operations.download.Downloader.termination_check',
new_callable=mock.PropertyMock) as patched_tc:
d = ops.Downloader(
mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._general_options.concurrency.disk_threads = 1
d._disk_queue = mock.MagicMock()
d._disk_queue.get.side_effect = [
(mock.MagicMock(), mock.MagicMock(), mock.MagicMock()),
]
d._process_data = mock.MagicMock()
d._process_data.side_effect = Exception()
patched_tc.side_effect = [False, True]
d._worker_thread_disk()
assert len(d._exceptions) == 1
test_blobxfer_operations_download.py 文件源码
python
阅读 39
收藏 0
点赞 0
评论 0
评论列表
文章目录