def test_worker_thread_transfer():
s = ops.SyncCopy(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
s._transfer_queue.put(mock.MagicMock())
s._transfer_queue.put(mock.MagicMock())
s._process_synccopy_descriptor = mock.MagicMock()
s._process_synccopy_descriptor.side_effect = [None, Exception()]
with mock.patch(
'blobxfer.operations.synccopy.SyncCopy.termination_check',
new_callable=mock.PropertyMock) as patched_tc:
patched_tc.side_effect = [False, False, True]
s._worker_thread_transfer()
assert s._process_synccopy_descriptor.call_count == 2
assert len(s._exceptions) == 1
test_blobxfer_operations_synccopy.py 文件源码
python
阅读 26
收藏 0
点赞 0
评论 0
评论列表
文章目录