def test_zip(n, n2, kwargs, expected):
source = Stream()
source2 = Stream()
L = es.zip(source, source2, **kwargs).sink_to_list()
s = list(to_event_model(
[np.random.random((10, 10)) for _ in range(n)],
output_info=[('pe1_image', {'dtype': 'array'})]
))
s2 = list(to_event_model(
[np.random.random((10, 10)) for _ in range(n2)],
output_info=[('pe1_image', {'dtype': 'array'})]
))
for _ in range(2):
L.clear()
for b in s2:
source2.emit(b)
for a in s:
source.emit(a)
assert_docs = set()
for name, (l1, l2) in L:
assert_docs.add(name)
assert_raises(AssertionError, assert_equal, l1, l2)
assert set(assert_docs) == {'start', 'descriptor', 'event', 'stop'}
python类assert_equal()的实例源码
def test_fit_dki():
with nbtmp.InTemporaryDirectory() as tmpdir:
fbval = op.join(tmpdir, 'dki.bval')
fbvec = op.join(tmpdir, 'dki.bvec')
fdata = op.join(tmpdir, 'dki.nii.gz')
make_dki_data(fbval, fbvec, fdata)
cmd = ["pyAFQ_dki", "-d", fdata, "-l", fbval, "-c", fbvec,
"-o", tmpdir]
out = runner.run_command(cmd)
npt.assert_equal(out[0], 0)
# Get expected values
names = ['FA', 'MD', 'AD', 'RD', 'MK', 'AK', 'RK']
for n in names:
fname = op.join(tmpdir, "dki_%s.nii.gz" % n)
img = nib.load(fdata)
affine = img.get_affine()
shape = img.shape[:-1]
assert_image_shape_affine(fname, shape, affine)
def test_predict_dki():
with nbtmp.InTemporaryDirectory() as tmpdir:
fbval = op.join(tmpdir, 'dki.bval')
fbvec = op.join(tmpdir, 'dki.bvec')
fdata = op.join(tmpdir, 'dki.nii.gz')
make_dki_data(fbval, fbvec, fdata)
cmd1 = ["pyAFQ_dki", "-d", fdata, "-l", fbval, "-c", fbvec,
"-o", tmpdir]
out = runner.run_command(cmd1)
npt.assert_equal(out[0], 0)
# Get expected values
fparams = op.join(tmpdir, "dki_params.nii.gz")
cmd2 = ["pyAFQ_dki_predict", "-p", fparams, "-l", fbval, "-c", fbvec,
"-o", tmpdir, '-b', '0']
out = runner.run_command(cmd2)
npt.assert_equal(out[0], 0)
pred = nib.load(op.join(tmpdir, "dki_prediction.nii.gz")).get_data()
data = nib.load(op.join(tmpdir, "dki.nii.gz")).get_data()
npt.assert_array_almost_equal(pred, data)
def test_fit_dti():
with nbtmp.InTemporaryDirectory() as tmpdir:
fbval = op.join(tmpdir, 'dti.bval')
fbvec = op.join(tmpdir, 'dti.bvec')
fdata = op.join(tmpdir, 'dti.nii.gz')
make_dti_data(fbval, fbvec, fdata)
cmd = ["pyAFQ_dti", "-d", fdata, "-l", fbval, "-c", fbvec,
"-o", tmpdir, '-b', '0']
out = runner.run_command(cmd)
npt.assert_equal(out[0], 0)
# Get expected values
names = ['FA', 'MD', 'AD', 'RD']
for n in names:
fname = op.join(tmpdir, "dti_%s.nii.gz" % n)
img = nib.load(fdata)
affine = img.get_affine()
shape = img.shape[:-1]
assert_image_shape_affine(fname, shape, affine)
def test_predict_dti():
with nbtmp.InTemporaryDirectory() as tmpdir:
fbval = op.join(tmpdir, 'dti.bval')
fbvec = op.join(tmpdir, 'dti.bvec')
fdata = op.join(tmpdir, 'dti.nii.gz')
make_dti_data(fbval, fbvec, fdata)
cmd1 = ["pyAFQ_dti", "-d", fdata, "-l", fbval, "-c", fbvec,
"-o", tmpdir]
out = runner.run_command(cmd1)
npt.assert_equal(out[0], 0)
# Get expected values
fparams = op.join(tmpdir, "dti_params.nii.gz")
cmd2 = ["pyAFQ_dti_predict", "-p", fparams, "-l", fbval, "-c", fbvec,
"-o", tmpdir, '-b', '0']
out = runner.run_command(cmd2)
npt.assert_equal(out[0], 0)
pred = nib.load(op.join(tmpdir, "dti_prediction.nii.gz")).get_data()
data = nib.load(op.join(tmpdir, "dti.nii.gz")).get_data()
npt.assert_array_almost_equal(pred, data)
def test_fit_csd():
fdata, fbval, fbvec = dpd.get_data('small_64D')
with nbtmp.InTemporaryDirectory() as tmpdir:
# Convert from npy to txt:
bvals = np.load(fbval)
bvecs = np.load(fbvec)
np.savetxt(op.join(tmpdir, 'bvals.txt'), bvals)
np.savetxt(op.join(tmpdir, 'bvecs.txt'), bvecs)
for sh_order in [4, 6]:
fname = csd.fit_csd(fdata, op.join(tmpdir, 'bvals.txt'),
op.join(tmpdir, 'bvecs.txt'),
out_dir=tmpdir, sh_order=sh_order)
npt.assert_(op.exists(fname))
sh_coeffs_img = nib.load(fname)
npt.assert_equal(sh_order,
calculate_max_order(sh_coeffs_img.shape[-1]))
def test_read_write_trk():
sl = [np.array([[0, 0, 0], [0, 0, 0.5], [0, 0, 1], [0, 0, 1.5]]),
np.array([[0, 0, 0], [0, 0.5, 0.5], [0, 1, 1]])]
with nbtmp.InTemporaryDirectory() as tmpdir:
fname = op.join(tmpdir, 'sl.trk')
aus.write_trk(fname, sl)
new_sl = aus.read_trk(fname)
npt.assert_equal(list(new_sl), sl)
# What happens if this set of streamlines has some funky affine
# associated with it?
aff = np.eye(4) * np.random.rand()
aff[:3, 3] = np.array([1, 2, 3])
aff[3, 3] = 1
# We move the streamlines, and report the inverse of the affine:
aus.write_trk(fname, move_streamlines(sl, aff),
affine=np.linalg.inv(aff))
# When we read this, we get back what we put in:
new_sl = aus.read_trk(fname)
# Compare each streamline:
for new, old in zip(new_sl, sl):
npt.assert_almost_equal(new, old, decimal=4)
def test_parfor():
my_array = np.arange(100).reshape(10, 10)
i, j = np.random.randint(0, 9, 2)
my_list = list(my_array.ravel())
for engine in ["joblib", "dask", "serial"]:
for backend in ["threading", "multiprocessing"]:
npt.assert_equal(para.parfor(power_it,
my_list,
engine=engine,
backend=backend,
out_shape=my_array.shape)[i, j],
power_it(my_array[i, j]))
# If it's not reshaped, the first item should be the item 0, 0:
npt.assert_equal(para.parfor(power_it,
my_list,
engine=engine,
backend=backend)[0],
power_it(my_array[0, 0]))
def test_safe_binop():
# Test checked arithmetic routines
ops = [
(operator.add, 1),
(operator.sub, 2),
(operator.mul, 3)
]
with exc_iter(ops, INT64_VALUES, INT64_VALUES) as it:
for xop, a, b in it:
pyop, op = xop
c = pyop(a, b)
if not (INT64_MIN <= c <= INT64_MAX):
assert_raises(OverflowError, mt.extint_safe_binop, a, b, op)
else:
d = mt.extint_safe_binop(a, b, op)
if c != d:
# assert_equal is slow
assert_equal(d, c)
def test_divmod_128_64():
with exc_iter(INT128_VALUES, INT64_POS_VALUES) as it:
for a, b in it:
if a >= 0:
c, cr = divmod(a, b)
else:
c, cr = divmod(-a, b)
c = -c
cr = -cr
d, dr = mt.extint_divmod_128_64(a, b)
if c != d or d != dr or b*d + dr != a:
assert_equal(d, c)
assert_equal(dr, cr)
assert_equal(b*d + dr, a)
def check_may_share_memory_exact(a, b):
got = np.may_share_memory(a, b, max_work=MAY_SHARE_EXACT)
assert_equal(np.may_share_memory(a, b),
np.may_share_memory(a, b, max_work=MAY_SHARE_BOUNDS))
a.fill(0)
b.fill(0)
a.fill(1)
exact = b.any()
err_msg = ""
if got != exact:
err_msg = " " + "\n ".join([
"base_a - base_b = %r" % (a.__array_interface__['data'][0] - b.__array_interface__['data'][0],),
"shape_a = %r" % (a.shape,),
"shape_b = %r" % (b.shape,),
"strides_a = %r" % (a.strides,),
"strides_b = %r" % (b.strides,),
"size_a = %r" % (a.size,),
"size_b = %r" % (b.size,)
])
assert_equal(got, exact, err_msg=err_msg)
def test_to_128():
with exc_iter(INT64_VALUES) as it:
for a, in it:
b = mt.extint_to_128(a)
if a != b:
assert_equal(b, a)
def test_to_64():
with exc_iter(INT128_VALUES) as it:
for a, in it:
if not (INT64_MIN <= a <= INT64_MAX):
assert_raises(OverflowError, mt.extint_to_64, a)
else:
b = mt.extint_to_64(a)
if a != b:
assert_equal(b, a)
def test_mul_64_64():
with exc_iter(INT64_VALUES, INT64_VALUES) as it:
for a, b in it:
c = a * b
d = mt.extint_mul_64_64(a, b)
if c != d:
assert_equal(d, c)
def test_add_128():
with exc_iter(INT128_VALUES, INT128_VALUES) as it:
for a, b in it:
c = a + b
if not (INT128_MIN <= c <= INT128_MAX):
assert_raises(OverflowError, mt.extint_add_128, a, b)
else:
d = mt.extint_add_128(a, b)
if c != d:
assert_equal(d, c)
def test_neg_128():
with exc_iter(INT128_VALUES) as it:
for a, in it:
b = -a
c = mt.extint_neg_128(a)
if b != c:
assert_equal(c, b)
def test_shl_128():
with exc_iter(INT128_VALUES) as it:
for a, in it:
if a < 0:
b = -(((-a) << 1) & (2**128-1))
else:
b = (a << 1) & (2**128-1)
c = mt.extint_shl_128(a)
if b != c:
assert_equal(c, b)
def test_shr_128():
with exc_iter(INT128_VALUES) as it:
for a, in it:
if a < 0:
b = -((-a) >> 1)
else:
b = a >> 1
c = mt.extint_shr_128(a)
if b != c:
assert_equal(c, b)
def test_gt_128():
with exc_iter(INT128_VALUES, INT128_VALUES) as it:
for a, b in it:
c = a > b
d = mt.extint_gt_128(a, b)
if c != d:
assert_equal(d, c)
def test_ceildiv_128_64():
with exc_iter(INT128_VALUES, INT64_POS_VALUES) as it:
for a, b in it:
c = (a + b - 1) // b
d = mt.extint_ceildiv_128_64(a, b)
if c != d:
assert_equal(d, c)