def test_import_file_to_dir(self):
with tempfile.NamedTemporaryFile(mode='w') as temp:
content = 'Some data'
temp.write(content)
temp.flush()
dir = os.path.join('data', self.dir1)
settings = copy(self.settings)
settings.parse_args('import {} {}'.format(temp.name, dir))
cmd = CmdImportFile(settings)
cmd.run()
dvc_name = os.path.join(dir, os.path.basename(temp.name))
data_item = self.path_factory.existing_data_item(dvc_name)
self.assertTrue(os.path.exists(data_item.data.relative))
self.assertTrue(filecmp.cmp(data_item.data.relative, data_item.cache.relative))
self.assertEqual(open(data_item.data.relative).read(), content)
self.assertTrue(os.path.exists(data_item.cache.relative))
self.assertEqual(open(data_item.cache.relative).read(), content)
self.assertTrue(os.path.exists(data_item.state.relative))
pass
python类cmp()的实例源码
def test_copyfile(self):
src = 'file1'
dest = 'file2'
dest_dir = 'testdir'
with open(src, 'w+') as f:
f.write('file1contents')
os.mkdir(dest_dir)
utils.copyfile(src, dest)
self.assertTrue(filecmp.cmp(src, dest))
utils.copyfile(src, dest_dir)
self.assertTrue(filecmp.cmp(src, '{}/{}'.format(dest_dir, src)))
shutil.rmtree(dest_dir)
os.remove(src)
os.remove(dest)
def test_restore_disk_in_domain(self, get_uncompressed_complete_backup,
build_stopped_mock_domain, tmpdir):
backup = get_uncompressed_complete_backup
domain = build_stopped_mock_domain
src_img = backup.get_complete_path_of(backup.disks["vda"])
domain.set_storage_basedir(str(tmpdir))
dst_img = get_domain_disks_of(domain.XMLDesc(), "vda")["vda"]["src"]
backup.restore_and_replace_disk_of("vda", domain, "vda")
assert filecmp.cmp(src_img, dst_img)
assert (
get_domain_disks_of(domain.XMLDesc())["vda"]["type"] ==
get_domain_disks_of(backup.dom_xml)["vda"]["type"]
)
def test_fetch(self):
# Create temp file to copy
file = open(os.path.join(self._copyFile.copy_from_prefix,
"%sbigFileXYZA" % tempfile.gettempprefix()), 'w')
file.write("Lots of interesting stuff")
file.close()
# Perform copy
interpolate_dict = {"project_identifier": tempfile.gettempprefix(),
"product_identifier": "bigFile",
"run_identifier": "XYZ"}
result_paths = self._copyFile.fetch(interpolate_dict=interpolate_dict)
# Test copied file exists and DataFile matches
self.assertEqual(len(result_paths), 1)
df = result_paths[0]
self.assertEqual(df.file_name, "%sbigFileXYZB" % tempfile.gettempprefix())
self.assertEqual(df.location, os.path.join(tempfile.gettempdir(),
"%sbigFileXYZB" % tempfile.gettempprefix()))
self.assertEqual(df.equipment, self._equipmentSequencer)
self.assertIs(filecmp.cmp(
os.path.join(tempfile.gettempdir(), "%sbigFileXYZA" % tempfile.gettempprefix()),
os.path.join(tempfile.gettempdir(), "%sbigFileXYZB" % tempfile.gettempprefix())), True)
# Clean up
os.remove(os.path.join(tempfile.gettempdir(), "%sbigFileXYZA" % tempfile.gettempprefix()))
os.remove(os.path.join(tempfile.gettempdir(), "%sbigFileXYZB" % tempfile.gettempprefix()))
def test_upload_success(repository_upload_fixture):
fixture = repository_upload_fixture
fixture.package.build()
# Case where it works
assert not fixture.debian_repository.is_uploaded(fixture.package)
assert fixture.package.is_built
fixture.debian_repository.upload(fixture.package, [])
assert fixture.debian_repository.is_uploaded(fixture.package)
assert fixture.package.files_to_distribute
for filename in fixture.package.files_to_distribute:
filename_only = os.path.basename(filename)
incoming_filename = os.path.join(fixture.incoming_directory.name, filename_only)
assert filecmp.cmp(filename, incoming_filename)
# Case where you try upload something again
assert fixture.debian_repository.is_uploaded(fixture.package)
assert fixture.package.is_built
with expected(AlreadyUploadedException):
fixture.debian_repository.upload(fixture.package, [])
def test_create_less_simple_xls(self):
wb, ws = self.create_simple_xls()
more_content=[
[
u'A{0}'.format(i),
u'Za?ó?? g??l? ja?? {0} {1}'.format(i, LOREM_IPSUM),
]
for idx, i in enumerate(range(1000, 1050))
]
for r_idx, content_row in enumerate(more_content, 3):
for c_idx, cell in enumerate(content_row):
ws.write(r_idx, c_idx, cell)
wb.save(in_tst_output_dir('less_simple.xls'))
self.assertTrue(filecmp.cmp(in_tst_dir('less_simple.xls'),
in_tst_output_dir('less_simple.xls'),
shallow=False))
def test_create_less_simple_xls(self):
wb, ws = self.create_simple_xls()
more_content=[
[
u'A{0}'.format(i),
u'Za?ó?? g??l? ja?? {0} {1}'.format(i, LOREM_IPSUM),
]
for idx, i in enumerate(range(1000, 1050))
]
for r_idx, content_row in enumerate(more_content, 3):
for c_idx, cell in enumerate(content_row):
ws.write(r_idx, c_idx, cell)
wb.save(in_tst_output_dir('less_simple.xls'))
self.assertTrue(filecmp.cmp(in_tst_dir('less_simple.xls'),
in_tst_output_dir('less_simple.xls'),
shallow=False))
def compare(fname1, fname2, binary_mode=False):
if binary_mode:
return cmp(fname1, fname2, shallow=False)
f1 = open(fname1, 'r')
f2 = open(fname2, 'r')
while True:
line1 = f1.readline()
line2 = f2.readline()
# if one of files had already ended then for sure it's not equal
if not line1 and line2 or line1 and not line2:
return False
# if both files ended at the same time then these are equal
if not line1 and not line2:
return True
# if given line doesn't match then files are not equal
if line1.strip() != line2.strip():
return False
def test_open_foam_write_comparison(self):
open_foam_handler = ofh.OpenFoamHandler()
mesh_points = open_foam_handler.parse(
'tests/test_datasets/test_openFOAM'
)
mesh_points[0] = [-14., 1.55, 0.2]
mesh_points[1] = [-14.3, 2.55, 0.3]
mesh_points[2] = [-14.3, 2.55, 0.3]
mesh_points[2000] = [7.8, -42.8, .0]
mesh_points[2001] = [8.8, -41.8, .1]
mesh_points[2002] = [9.8, -40.8, .0]
mesh_points[-3] = [236.3, 183.7, 0.06]
mesh_points[-2] = [237.3, 183.7, 0.06]
mesh_points[-1] = [236.3, 185.7, 0.06]
outfilename = 'tests/test_datasets/test_openFOAM_out'
outfilename_expected = 'tests/test_datasets/test_openFOAM_out_true'
open_foam_handler.write(mesh_points, outfilename)
self.assertTrue(filecmp.cmp(outfilename, outfilename_expected))
self.addCleanup(os.remove, outfilename)
def test_write_parameters_filename_default(self):
params = rbfp.RBFParameters()
params.basis = 'gaussian_spline'
params.radius = 0.5
params.n_control_points = 8
params.power = 2
params.original_control_points = np.array([0., 0., 0., 0., 0., 1., 0., 1., 0., 1., 0., 0., \
0., 1., 1., 1., 0., 1., 1., 1., 0., 1., 1., 1.]).reshape((8, 3))
params.deformed_control_points = np.array([0., 0., 0., 0., 0., 1., 0., 1., 0., 1., 0., 0., \
0., 1., 1., 1., 0., 1., 1., 1., 0., 1., 1., 1.]).reshape((8, 3))
outfilename = 'test.prm'
params.write_parameters(outfilename)
outfilename_expected = 'tests/test_datasets/parameters_rbf_default.prm'
print(filecmp.cmp(outfilename, outfilename_expected))
self.assertTrue(filecmp.cmp(outfilename, outfilename_expected))
os.remove(outfilename)
def test_unv_write_comparison_2(self):
unv_handler = uh.UnvHandler()
mesh_points = unv_handler.parse('tests/test_datasets/test_square.unv')
mesh_points[0][0] = 2.2
mesh_points[5][1] = 4.3
mesh_points[9][2] = 0.5
mesh_points[45][0] = 7.2
mesh_points[132][1] = -1.2
mesh_points[255][2] = -3.6
outfilename = 'tests/test_datasets/test_square_out.unv'
outfilename_expected = 'tests/test_datasets/test_square_out_true.unv'
unv_handler.write(mesh_points, outfilename)
self.assertTrue(filecmp.cmp(outfilename, outfilename_expected))
self.addCleanup(os.remove, outfilename)
def test_stl_write_comparison(self):
stl_handler = sh.StlHandler()
mesh_points = stl_handler.parse('tests/test_datasets/test_sphere.stl')
mesh_points[0] = [-40.2, -20.5, 60.9]
mesh_points[1] = [-40.2, -10.5, 60.9]
mesh_points[2] = [-40.2, -10.5, 60.9]
mesh_points[500] = [-40.2, -20.5, 60.9]
mesh_points[501] = [-40.2, -10.5, 60.9]
mesh_points[502] = [-40.2, -10.5, 60.9]
mesh_points[1000] = [-40.2, -20.5, 60.9]
mesh_points[1001] = [-40.2, -10.5, 60.9]
mesh_points[1002] = [-40.2, -10.5, 60.9]
outfilename = 'tests/test_datasets/test_sphere_out.stl'
outfilename_expected = 'tests/test_datasets/test_sphere_out_true.stl'
stl_handler.write(mesh_points, outfilename)
self.assertTrue(filecmp.cmp(outfilename, outfilename_expected))
self.addCleanup(os.remove, outfilename)
def test_stl_write_ascii_from_binary(self):
stl_handler = sh.StlHandler()
mesh_points = stl_handler.parse(
'tests/test_datasets/test_sphere_bin.stl'
)
mesh_points[0] = [-40.2, -20.5, 60.9]
mesh_points[1] = [-40.2, -10.5, 60.9]
mesh_points[2] = [-40.2, -10.5, 60.9]
mesh_points[500] = [-40.2, -20.5, 60.9]
mesh_points[501] = [-40.2, -10.5, 60.9]
mesh_points[502] = [-40.2, -10.5, 60.9]
mesh_points[1000] = [-40.2, -20.5, 60.9]
mesh_points[1001] = [-40.2, -10.5, 60.9]
mesh_points[1002] = [-40.2, -10.5, 60.9]
outfilename = 'tests/test_datasets/test_sphere_out.stl'
outfilename_expected = 'tests/test_datasets/test_sphere_out_true.stl'
stl_handler.write(mesh_points, outfilename, write_bin=False)
self.assertTrue(filecmp.cmp(outfilename, outfilename_expected))
self.addCleanup(os.remove, outfilename)
def test_get_file():
"""
Tests retrieving file from localfile storage.
"""
setup()
with file('examples/telco/resources/cave.png','rb') as infile:
stored_filename = media.set_file(infile)
assert stored_filename == 'cave.png'
retrieved_file = media.get_file('cave.png')
assert filecmp.cmp(retrieved_file.name,'examples/telco/resources/cave.png')
def test_get_file():
"""
Tests retrieving file from localfile storage.
"""
setup()
with file('examples/telco/resources/cave.png','rb') as infile:
stored_filename = localfile.set_file(infile)
assert stored_filename == 'cave.png'
retrieved_file = localfile.get_file('cave.png')
assert filecmp.cmp(retrieved_file.name,'examples/telco/resources/cave.png')
def test_simple_local_copy(tmpdir):
src = tmpdir.join('alpha')
dst = tmpdir.join('beta')
src.write('some data')
assert src.check()
assert not dst.check()
copy(Resource('file://' + src.strpath),
Resource('file://' + dst.strpath))
assert src.check()
assert dst.check()
assert filecmp.cmp(src.strpath, dst.strpath)
def test_simple_local_copy_with_callback(tmpdir):
def wrapper(size):
nonlocal count
count += 1
count = 0
src = tmpdir.join('alpha')
dst = tmpdir.join('beta')
data = b'some data'
src.write(data)
chunk_size = 1
assert src.check()
assert not dst.check()
copy(Resource('file://' + src.strpath),
Resource('file://' + dst.strpath,),
size=chunk_size,
callback_freq=1,
callback=wrapper)
assert src.check()
assert dst.check()
assert filecmp.cmp(src.strpath, dst.strpath)
assert count == estimate_nb_cycles(len(data), chunk_size)
dst.remove()
count = 0
chunk_size = 2
assert src.check()
assert not dst.check()
copy(Resource('file://' + src.strpath),
Resource('file://' + dst.strpath,),
size=chunk_size,
callback_freq=1,
callback=wrapper)
assert src.check()
assert dst.check()
assert filecmp.cmp(src.strpath, dst.strpath)
assert count == estimate_nb_cycles(len(data), chunk_size)
def build():
"""Build and apply a user configuration."""
env = _get_vars("/etc/environment")
env.validate(full=True)
os.chdir(env.net_config)
compose(env)
new_config = os.path.join(env.net_config, FILE_NAME)
run_config = os.path.join(env.freeradius_repo, PYTHON_MODS, FILE_NAME)
diff = filecmp.cmp(new_config, run_config)
if not diff:
print('change detected')
shutil.copyfile(run_config, run_config + ".prev")
shutil.copyfile(new_config, run_config)
u = pwd.getpwnam("radiusd")
os.chown(run_config, u.pw_uid, u.pw_gid)
update_wiki(env, run_config)
hashed = get_file_hash(FILE_NAME)
git = "latest commit"
git_indicator = env.working_dir + "git"
if os.path.exists(git_indicator):
with open(git_indicator, 'r') as f:
git = f.read().strip()
status = "ready"
_smirc("{} -> {} ({})".format(status, git, hashed))
_feed(env, "radius configuration updated")
daily_report(env, run_config)
def test_main(self):
self.fake_popen.return_value.stdout = BytesIO(get_base64_image())
args = ['snapshot', HTML_FILE]
with patch.object(sys, 'argv', args):
main()
assert(filecmp.cmp('output.png', get_fixture('sample.png')))