def test_batch(backend, M, N, K, density, alpha, beta, batch):
b = backend()
A_h = indigo.util.randM(M, N, density)
A = b.SpMatrix(A_h, batch=batch)
# forward
x = b.rand_array((N,K))
y = b.rand_array((M,K))
y_exp = beta * y.to_host() + alpha * A_h * x.to_host()
A.eval(y, x, alpha=alpha, beta=beta)
npt.assert_allclose(y.to_host(), y_exp, rtol=1e-5)
# adjoint
x = b.rand_array((M,K))
y = b.rand_array((N,K))
y_exp = beta * y.to_host() + alpha * A_h.H * x.to_host()
A.H.eval(y, x, alpha=alpha, beta=beta)
npt.assert_allclose(y.to_host(), y_exp, rtol=1e-5)
python类assert_allclose()的实例源码
def test_One(backend, M, N, K, alpha, beta, forward):
x = indigo.util.rand64c(K,N)
y = indigo.util.rand64c(M,N)
B = backend()
if getattr(B.onemm, '__isabstractmethod__', False):
pytest.skip("backed <%s> doesn't implement onemm" % backend.__name__)
if not hasattr(B, 'onemm'):
pytest.skip("backend doesn't implement onemm")
O = B.One((M,K), dtype=np.complex64)
if forward:
u, v = x, y
else:
v, u = x, y
u_d = B.copy_array(u)
v_d = B.copy_array(v)
exp = beta * v + \
np.broadcast_to(alpha*u.sum(axis=0,keepdims=True), v.shape)
O.eval(v_d, u_d, alpha=alpha, beta=beta, forward=forward)
act = v_d.to_host()
np.testing.assert_allclose(act, exp, rtol=1e-5)
def test_dl_dG():
nz, neq, nineq = 10, 0, 3
[p, Q, G, h, A, b, truez], [dQ, dp, dG, dh, dA, db] = get_grads(
nz=nz, neq=neq, nineq=nineq)
def f(G):
G = G.reshape(nineq, nz)
_, zhat, nu, lam, slacks = qp_cvxpy.forward_single_np(Q, p, G, h, A, b)
return 0.5 * np.sum(np.square(zhat - truez))
df = nd.Gradient(f)
dG_fd = df(G.ravel()).reshape(nineq, nz)
if verbose:
# print('dG_fd[1,:]: ', dG_fd[1,:])
# print('dG[1,:]: ', dG[1,:])
print('dG_fd: ', dG_fd)
print('dG: ', dG)
npt.assert_allclose(dG_fd, dG, rtol=RTOL, atol=ATOL)
def test_dl_dA():
nz, neq, nineq = 10, 3, 1
[p, Q, G, h, A, b, truez], [dQ, dp, dG, dh, dA, db] = get_grads(
nz=nz, neq=neq, nineq=nineq, Qscale=100., Gscale=100., Ascale=100.)
def f(A):
A = A.reshape(neq, nz)
_, zhat, nu, lam, slacks = qp_cvxpy.forward_single_np(Q, p, G, h, A, b)
return 0.5 * np.sum(np.square(zhat - truez))
df = nd.Gradient(f)
dA_fd = df(A.ravel()).reshape(neq, nz)
if verbose:
# print('dA_fd[0,:]: ', dA_fd[0,:])
# print('dA[0,:]: ', dA[0,:])
print('dA_fd: ', dA_fd)
print('dA: ', dA)
npt.assert_allclose(dA_fd, dA, rtol=RTOL, atol=ATOL)
def make_func(varname, wrfnc, timeidx, method, squeeze, meta):
def func(self):
try:
args = get_args(varname, wrfnc, timeidx, method, squeeze)
except ProjectionError: # Don't fail for this
return
routine = ROUTINE_MAP[varname]
kwargs = {"meta" : meta}
result = routine(*args, **kwargs)
ref = getvar(wrfnc, varname, timeidx, method, squeeze, cache=None,
meta=meta)
nt.assert_allclose(to_np(result), to_np(ref))
if meta:
self.assertEqual(result.dims, ref.dims)
return func
def test_RectangularFinExchanger():
PFE = RectangularFinExchanger(0.03, 0.001, 0.012)
assert_allclose(PFE.fin_height, 0.03)
assert_allclose(PFE.fin_thickness, 0.001)
assert_allclose(PFE.fin_spacing, 0.012)
# calculated values
assert_allclose(PFE.channel_height, 0.029)
assert_allclose(PFE.blockage_ratio, 0.8861111111111111)
assert_allclose(PFE.fin_count, 83.33333333333333)
assert_allclose(PFE.Dh, 0.01595)
assert_allclose(PFE.channel_width, 0.011)
# with layers, plate thickness, width, and length (fully defined)
PFE = RectangularFinExchanger(0.03, 0.001, 0.012, length=1.2, width=2.401, plate_thickness=.005, layers=40)
assert_allclose(PFE.A_HX_layer, 19.2)
assert_allclose(PFE.layer_fin_count, 200)
assert_allclose(PFE.A_HX, 768.0)
assert_allclose(PFE.height, 1.4+.005)
assert_allclose(PFE.volume, 4.048085999999999)
assert_allclose(PFE.A_specific_HX, 189.71928956054794)
def test_RectangularOffsetStripFinExchanger():
ROSFE = RectangularOffsetStripFinExchanger(fin_length=.05, fin_height=.01, fin_thickness=.003, fin_spacing=.05)
assert_allclose(ROSFE.fin_length, 0.05)
assert_allclose(ROSFE.fin_height, 0.01)
assert_allclose(ROSFE.fin_thickness, 0.003)
assert_allclose(ROSFE.fin_spacing, 0.05)
assert_allclose(ROSFE.blockage_ratio, 0.348)
assert_allclose(ROSFE.blockage_ratio_Kim, 0.34199999999999997)
assert_allclose(ROSFE.alpha, 5)
assert_allclose(ROSFE.delta, 0.06)
assert_allclose(ROSFE.gamma, 0.06)
assert_allclose(ROSFE.A_channel, 0.000329)
# assert_allclose(ROSFE.SA_fin, 0.005574)
assert_allclose(ROSFE.Dh, 0.011804808037316112)
assert_allclose(ROSFE.Dh_Kays_London, 0.012185185185185186)
assert_allclose(ROSFE.Dh_Joshi_Webb, 0.011319367879456085)
# With layers, plate thickness, width (fully defined)
# ROSFE = RectangularOffsetStripFinExchanger(fin_length=.05, fin_height=.01, fin_thickness=.003, fin_spacing=.05, length=1.2, width=2.401, plate_thickness=.005, layers=40)
# assert_allclose(ROSFE.A_HX_layer, 0.267552)
def test_create_Toeplitz():
from stfinv.utils.inversion import _create_Toeplitz
# even length
d1 = np.array([1., 0., 0., 0., 1., 2., 1., 0., 0., 1])
d2 = np.array([0., 0., 1., 3., 2., 1., 0., 0., 0., 0])
G = _create_Toeplitz(d2)
npt.assert_allclose(np.matmul(G, d1),
np.convolve(d1, d2, 'same'),
atol=1e-7, rtol=1e-7)
# odd length
d1 = np.array([1., 0., 0., 0., 1., 2., 1., 0., 0.])
d2 = np.array([0., 0., 1., 3., 2., 1., 0., 0., 0.])
G = _create_Toeplitz(d2)
npt.assert_allclose(np.matmul(G, d1),
np.convolve(d1, d2, 'same'),
atol=1e-7, rtol=1e-7)
def test_invert_STF():
from stfinv.utils.inversion import invert_STF
tr = obspy.Trace(data=np.array([0., 0., 0., 0., 1., 2., 1., 0., 0.]))
st_synth = obspy.Stream(tr)
tr = obspy.Trace(data=np.array([0., 0., 1., 3., 2., 1., 0., 0., 0.]))
st_synth.append(tr)
stf_ref = np.array([0., 0., 1., 1., 0., 1., 1., 0., 0.])
tr = obspy.Trace(data=np.convolve(st_synth[0].data, stf_ref, 'same'))
st_data = obspy.Stream(tr)
tr = obspy.Trace(data=np.convolve(st_synth[1].data, stf_ref, 'same'))
st_data.append(tr)
stf = invert_STF(st_data, st_synth)
halflen = (len(stf) + 1) / 2
stf = np.r_[stf[halflen:], stf[0:halflen]]
npt.assert_allclose(stf, stf_ref, rtol=1e-7, atol=1e-10)
def test_invert_STF_dampened():
from stfinv.utils.inversion import invert_STF
tr = obspy.Trace(data=np.array([0., 0., 0., 0., 1., 2., 1., 0., 0.]))
st_synth = obspy.Stream(tr)
tr = obspy.Trace(data=np.array([0., 0., 1., 3., 2., 1., 0., 0., 0.]))
st_synth.append(tr)
stf_ref = np.array([0., 0., 1., 1., 0., 1., 1., 0., 0.])
tr = obspy.Trace(data=np.convolve(st_synth[0].data, stf_ref, 'same'))
st_data = obspy.Stream(tr)
tr = obspy.Trace(data=np.convolve(st_synth[1].data, stf_ref, 'same'))
st_data.append(tr)
stf = invert_STF(st_data, st_synth, method='dampened', eps=1e-4)
npt.assert_allclose(stf, stf_ref, rtol=1e-2, atol=1e-10)
def test_stats2():
"""Test stats2 func from fluxpart.util"""
data = "7 8 4\n6 1 3\n10 6 6\n6 7 3\n8 2 4"
dtype = [('v0', int), ('v1', int), ('v2', int)]
arr = np.genfromtxt(io.BytesIO(data.encode()), dtype=dtype)
ans = stats2(arr)
npt.assert_allclose(ans.ave_v0, 37 / 5)
npt.assert_allclose(ans.ave_v1, 24 / 5)
npt.assert_allclose(ans.ave_v2, 4)
npt.assert_allclose(ans.var_v0, 14 / 5)
npt.assert_allclose(ans.var_v1, 97 / 10)
npt.assert_allclose(ans.var_v2, 3 / 2)
npt.assert_allclose(ans.cov_v0_v1, 3 / 5)
npt.assert_allclose(ans.cov_v0_v2, 2)
npt.assert_allclose(ans.cov_v1_v0, ans.cov_v0_v1)
npt.assert_allclose(ans.cov_v1_v2, 1)
npt.assert_allclose(ans.cov_v2_v0, ans.cov_v0_v2)
npt.assert_allclose(ans.cov_v2_v1, ans.cov_v1_v2)
data = "7 8 4\n6 1 3\n10 6 6\n6 7 3\n8 2 4"
dtype = [('v0', int), ('v1', int), ('v2', int)]
arr = np.genfromtxt(io.BytesIO(data.encode()), dtype=dtype)
ans = stats2(arr, names=('v0', 'v2'))
npt.assert_allclose(ans.ave_v0, 37 / 5)
npt.assert_allclose(ans.ave_v2, 4)
npt.assert_allclose(ans.var_v0, 14 / 5)
npt.assert_allclose(ans.var_v2, 3 / 2)
npt.assert_allclose(ans.cov_v0_v2, 2)
npt.assert_allclose(ans.cov_v2_v0, ans.cov_v0_v2)
assert not hasattr(ans, 'ave_v1')
assert not hasattr(ans, 'var_v1')
assert not hasattr(ans, 'cov_v0_v1')
assert not hasattr(ans, 'cov_v1_v0')
assert not hasattr(ans, 'cov_v1_v2')
assert not hasattr(ans, 'cov_v2_v1')
def assert_flux_components(calc, desired):
npt.assert_allclose(calc.Fcp, desired.Fcp, atol=0.5e-6)
npt.assert_allclose(calc.Fcr, desired.Fcr, atol=0.1e-6)
npt.assert_allclose(calc.Fqe, desired.Fqe, atol=0.01e-3)
npt.assert_allclose(calc.Fqt, desired.Fqt, atol=0.05e-3)
npt.assert_allclose(calc.Fcp_mol, desired.Fcp_mol, atol=10e-6)
npt.assert_allclose(calc.Fcr_mol, desired.Fcr_mol, atol=10e-6)
npt.assert_allclose(calc.LEe, desired.LEe, atol=10)
npt.assert_allclose(calc.LEt, desired.LEt, atol=50)
def assert_partition(rsoln, fluxes, desired):
if rsoln.validroot:
npt.assert_allclose(rsoln.var_cp, desired.var_cp, atol=1e-14)
npt.assert_allclose(rsoln.corr_cp_cr, desired.corr_cp_cr, atol=1e-2)
npt.assert_allclose(fluxes.wqt, desired.wqt, atol=1e-7)
npt.assert_allclose(fluxes.wqe, desired.wqe, atol=1e-7)
npt.assert_allclose(fluxes.wcp, desired.wcp, atol=1e-9)
npt.assert_allclose(fluxes.wcr, desired.wcr, atol=1e-10)
else:
assert False
def test_scalar(self):
pi = PolynomialInterpolator([1, 2, 3]) # 1 + 2s + 3s^2
assert pi.dof == 1
npt.assert_allclose(pi.eval([0, 0.5, 1]), [1, 2.75, 6])
npt.assert_allclose(pi.evald([0, 0.5, 1]), [2, 5, 8])
npt.assert_allclose(pi.evaldd([0, 0.5, 1]), [6, 6, 6])
def test_2_dof(self):
pi = PolynomialInterpolator([[1, 2, 3], [-2, 3, 4, 5]])
# [1 + 2s + 3s^2]
# [-2 + 3s + 4s^2 + 5s^3]
assert pi.dof == 2
npt.assert_allclose(
pi.eval([0, 0.5, 1]), [[1, -2], [2.75, 1.125], [6, 10]])
npt.assert_allclose(
pi.evald([0, 0.5, 1]), [[2, 3], [5, 10.75], [8, 26]])
npt.assert_allclose(pi.evaldd([0, 0.5, 1]), [[6, 8], [6, 23], [6, 38]])
def test_slack_bounds(self, rave_re_torque_data):
""" Test pc.l, pc.h vectors
"""
data, pc = rave_re_torque_data
path, ss, robot, J_lc = data
tau_bnd = robot.GetDOFTorqueLimits()
for i in range(pc.N + 1):
npt.assert_allclose(pc.l[i], -tau_bnd)
npt.assert_allclose(pc.h[i], tau_bnd)
def AR1(constrained=False):
g = .95
sn = .3
y, c, s = [a[0] for a in gen_data([g], sn, N=1)]
result = constrained_oasisAR1(y, g, sn) if constrained else oasisAR1(y, g, lam=2.4)
result_foopsi = constrained_foopsi(y, [g], sn) if constrained else foopsi(y, [g], lam=2.4)
npt.assert_allclose(np.corrcoef(result[0], result_foopsi[0])[0, 1], 1)
npt.assert_allclose(np.corrcoef(result[1], result_foopsi[1])[0, 1], 1)
npt.assert_allclose(np.corrcoef(result[0], c)[0, 1], 1, .03)
npt.assert_allclose(np.corrcoef(result[1], s)[0, 1], 1, .2)
def AR2(constrained=False):
g = [1.7, -.712]
sn = .3
y, c, s = [a[0] for a in gen_data(g, sn, N=1, seed=3)]
result = constrained_onnlsAR2(y, g, sn) if constrained else onnls(y, g, lam=25)
result_foopsi = constrained_foopsi(y, g, sn) if constrained else foopsi(y, g, lam=25)
npt.assert_allclose(np.corrcoef(result[0], result_foopsi[0])[0, 1], 1, 1e-3)
npt.assert_allclose(np.corrcoef(result[1], result_foopsi[1])[0, 1], 1, 1e-2)
npt.assert_allclose(np.corrcoef(result[0], c)[0, 1], 1, .03)
npt.assert_allclose(np.corrcoef(result[1], s)[0, 1], 1, .2)
result2 = constrained_oasisAR2(y, g[0], g[1], sn) if constrained \
else oasisAR2(y, g[0], g[1], lam=25)
npt.assert_allclose(np.corrcoef(result2[0], c)[0, 1], 1, .03)
npt.assert_allclose(np.corrcoef(result2[1], s)[0, 1], 1, .2)
def test_rotate_certain_angle():
d1 = np.array([1.0, 0.0])
d2 = np.array([0.0, 1.0])
dnew1, dnew2 = rotate.rotate_certain_angle(d1, d2, 30.0)
dnew1_true = np.array([np.sqrt(3)/2.0, 0.5])
dnew2_true = np.array([-0.5, np.sqrt(3)/2.0])
npt.assert_allclose(dnew1, dnew1_true)
npt.assert_allclose(dnew2, dnew2_true)
def test_rotate_12_ne():
d1 = np.array([1.0, 0.0])
d2 = np.array([0.0, 1.0])
n, e = rotate.rotate_12_ne(d1, d2, 30, 120)
n_true = np.array([np.sqrt(3)/2.0, -0.5])
e_true = np.array([0.5, np.sqrt(3)/2.0])
npt.assert_allclose(n, n_true)
npt.assert_allclose(e, e_true)