def test_simple_local_copy_with_callback(tmpdir):
def wrapper(size):
nonlocal count
count += 1
count = 0
src = tmpdir.join('alpha')
dst = tmpdir.join('beta')
data = b'some data'
src.write(data)
chunk_size = 1
assert src.check()
assert not dst.check()
copy(Resource('file://' + src.strpath),
Resource('file://' + dst.strpath,),
size=chunk_size,
callback_freq=1,
callback=wrapper)
assert src.check()
assert dst.check()
assert filecmp.cmp(src.strpath, dst.strpath)
assert count == estimate_nb_cycles(len(data), chunk_size)
dst.remove()
count = 0
chunk_size = 2
assert src.check()
assert not dst.check()
copy(Resource('file://' + src.strpath),
Resource('file://' + dst.strpath,),
size=chunk_size,
callback_freq=1,
callback=wrapper)
assert src.check()
assert dst.check()
assert filecmp.cmp(src.strpath, dst.strpath)
assert count == estimate_nb_cycles(len(data), chunk_size)
评论列表
文章目录