def test_parallel_beta(self):
t = Table(np.array([[0, 1, 3], [1, 1, 2]]),
['O1', 'O2'],
['S1', 'S2', 'S3'])
parallel = beta(table=t, metric='braycurtis', n_jobs=-1)
single_thread = beta(table=t, metric='braycurtis', n_jobs=1)
# expected computed with scipy.spatial.distance.braycurtis
expected = skbio.DistanceMatrix([[0.0000000, 0.3333333, 0.6666667],
[0.3333333, 0.0000000, 0.4285714],
[0.6666667, 0.4285714, 0.0000000]],
ids=['S1', 'S2', 'S3'])
self.assertEqual(parallel.ids, expected.ids)
self.assertEqual(single_thread.ids, expected.ids)
for id1 in parallel.ids:
for id2 in parallel.ids:
npt.assert_almost_equal(parallel[id1, id2], expected[id1, id2])
for id1 in single_thread.ids:
for id2 in single_thread.ids:
npt.assert_almost_equal(single_thread[id1, id2],
expected[id1, id2])
python类assert_almost_equal()的实例源码
def test_beta_phylogenetic(self):
t = Table(np.array([[0, 1, 3], [1, 1, 2]]),
['O1', 'O2'],
['S1', 'S2', 'S3'])
tree = skbio.TreeNode.read(io.StringIO(
'((O1:0.25, O2:0.50):0.25, O3:0.75)root;'))
actual = beta_phylogenetic(
table=t, phylogeny=tree, metric='unweighted_unifrac')
# expected computed with skbio.diversity.beta_diversity
expected = skbio.DistanceMatrix([[0.00, 0.25, 0.25],
[0.25, 0.00, 0.00],
[0.25, 0.00, 0.00]],
ids=['S1', 'S2', 'S3'])
self.assertEqual(actual.ids, expected.ids)
for id1 in actual.ids:
for id2 in actual.ids:
npt.assert_almost_equal(actual[id1, id2], expected[id1, id2])
def test_multiple_calls(self):
"""Tests that the results are the same after calling multiple times
"""
bayes = Bayesian(simulations={'Gun': [self.sim1, self.exp1]},
models={'eos': self.eos_model},
opt_keys='eos')
sol1, hist1, sens1, fisher1 = bayes()
sol2, hist2, sens2, fisher2 = bayes()
npt.assert_almost_equal(hist1[0], hist2[0], decimal=4,
err_msg='Histories not equal for subsequent'
'runs')
npt.assert_almost_equal(sol1.models['eos'].get_dof() /
sol2.models['eos'].get_dof(),
np.ones(bayes.shape()[1]),
decimal=10,
err_msg='DOF not equal for subsequent runs')
npt.assert_almost_equal(np.fabs(sens1['Gun'] - sens2['Gun']),
np.zeros(sens1['Gun'].shape),
decimal=10)
def test_params_regression():
"""
Test for regressions in model parameter values from provided data
"""
model = mli.Model()
ortho_x, ortho_y, ortho_n = mli.transform_data(op.join(data_path,
'ortho.csv'))
para_x, para_y, para_n = mli.transform_data(op.join(data_path,
'para.csv'))
ortho_fit = model.fit(ortho_x, ortho_y)
para_fit = model.fit(para_x, para_y)
npt.assert_almost_equal(ortho_fit.params[0], 0.46438638)
npt.assert_almost_equal(ortho_fit.params[1], 0.13845926)
npt.assert_almost_equal(para_fit.params[0], 0.57456788)
npt.assert_almost_equal(para_fit.params[1], 0.13684096)
def test_get_receiver_weights():
center = SpherePoint(0, 0, tag="source")
rec_counts, _ = ww.calculate_receiver_window_counts(windows)
points = ww.assign_receiver_to_points(rec_counts["BHZ"], stations)
ref_distance, cond_number = ww.get_receiver_weights(
"BHZ", center, points, 0.35, plot=False)
for p in points:
npt.assert_almost_equal(p.weight, 1.0)
npt.assert_almost_equal(cond_number, 1.0)
points = ww.assign_receiver_to_points(rec_counts["BHT"], stations)
ref_distance, cond_number = ww.get_receiver_weights(
"BHZ", center, points, 0.35, plot=False)
for p in points:
npt.assert_almost_equal(p.weight, 1.0)
npt.assert_almost_equal(cond_number, 1.0)
def test_normalize_receiver_weights():
rec_counts, cat_wcounts = ww.calculate_receiver_window_counts(windows)
comp = "BHZ"
channels = rec_counts[comp].keys()
channels.sort()
points = ww.assign_receiver_to_points(channels, stations)
weights = ww.normalize_receiver_weights(points, rec_counts[comp])
assert len(weights) == 3
for v in weights.itervalues():
npt.assert_almost_equal(v, 1.0)
points[0].weight = 0.5
points[1].weight = 0.75
points[2].weight = 1.0
weights = ww.normalize_receiver_weights(points, rec_counts[comp])
assert len(weights) == 3
npt.assert_almost_equal(weights["II.AAK..BHZ"], 0.625)
npt.assert_almost_equal(weights["II.ABKT..BHZ"], 0.9375)
npt.assert_almost_equal(weights["IU.BCD..BHZ"], 1.25)
def test_convert_adjs_to_trace():
array = np.array([1., 2., 3., 4., 5.])
starttime = UTCDateTime(1990, 1, 1)
adj = AdjointSource(
"cc_traveltime_misfit", 0, 1.0, 17, 40, "BHZ", adjoint_source=array,
network="II", station="AAK", location="",
starttime=starttime)
tr, meta = pa.convert_adj_to_trace(adj)
npt.assert_allclose(tr.data, array)
assert tr.stats.starttime == starttime
npt.assert_almost_equal(tr.stats.delta, 1.0)
assert tr.id == "II.AAK..BHZ"
assert meta["adj_src_type"] == "cc_traveltime_misfit"
npt.assert_almost_equal(meta["misfit"], 0.0)
npt.assert_almost_equal(meta["min_period"], 17.0)
npt.assert_almost_equal(meta["max_period"], 40.0)
def test_convert_adjs_to_stream():
array = np.array([1., 2., 3., 4., 5.])
starttime = UTCDateTime(1990, 1, 1)
adjsrcs = get_sample_adjsrcs(array, starttime)
true_keys = ["II.AAK..BHZ", "II.AAK..BHR", "II.AAK..BHT"]
st, meta = pa.convert_adjs_to_stream(adjsrcs)
assert len(meta) == 3
keys = meta.keys()
assert set(keys) == set(true_keys)
for m in meta.itervalues():
assert m["adj_src_type"] == "cc_traveltime_misfit"
npt.assert_almost_equal(m["misfit"], 0.0)
npt.assert_almost_equal(m["min_period"], 17.0)
npt.assert_almost_equal(m["max_period"], 40.0)
for tr, trid in zip(st, true_keys):
assert tr.id == trid
npt.assert_allclose(tr.data, array)
npt.assert_almost_equal(tr.stats.delta, 1.0)
assert tr.stats.starttime == starttime
def test_dump_adjsrc():
array = np.array([1., 2., 3., 4., 5.])
adj = AdjointSource(
"cc_traveltime_misfit", 2.0, 1.0, 17, 40, "BHZ",
adjoint_source=array, network="II", station="AAK",
location="", starttime=UTCDateTime(1990, 1, 1))
station_info = {"latitude": 1.0, "longitude": 2.0, "depth_in_m": 3.0,
"elevation_in_m": 4.0}
adj_array, adj_path, parameters = sa.dump_adjsrc(adj, station_info)
npt.assert_array_almost_equal(adj_array, array)
for key in station_info:
npt.assert_almost_equal(station_info[key], parameters[key])
assert adj_path == "II_AAK_BHZ"
npt.assert_almost_equal(parameters["misfit"], 2.0)
npt.assert_almost_equal(parameters["dt"], 1.0)
npt.assert_almost_equal(parameters["min_period"], 17.0)
npt.assert_almost_equal(parameters["max_period"], 40.0)
assert parameters["adjoint_source_type"] == "cc_traveltime_misfit"
assert parameters["station_id"] == "II.AAK"
assert parameters["component"], "BHZ"
assert UTCDateTime(parameters["starttime"]) == UTCDateTime(1990, 1, 1)
assert parameters["units"] == "m"
def test_load_to_adjsrc():
array = np.array([1., 2., 3., 4., 5.])
adj = AdjointSource(
"cc_traveltime_misfit", 2.0, 1.0, 17, 40, "BHZ",
adjoint_source=array, network="II", station="AAK",
location="", starttime=UTCDateTime(1990, 1, 1))
station_info = {"latitude": 1.0, "longitude": 2.0, "depth_in_m": 3.0,
"elevation_in_m": 4.0}
adj_array, adj_path, parameters = sa.dump_adjsrc(adj, station_info)
# ensemble a faked adjoint source from hdf5
hdf5_adj = namedtuple("HDF5Adj", ['data', 'parameters'])
hdf5_adj.data = array
hdf5_adj.parameters = parameters
# load and check
loaded_adj, loaded_station_info = sa.load_to_adjsrc(hdf5_adj)
adjoint_equal(loaded_adj, adj)
for k in station_info:
npt.assert_almost_equal(station_info[k], loaded_station_info[k])
assert loaded_station_info["station"] == "AAK"
assert loaded_station_info["network"] == "II"
assert loaded_station_info["location"] == ""
def test_prop_replay_distribution():
priors = [20000.0, 30000.0, 1000.0, 49000.0, 0.0]
cap = 256
batch_size = 32
sample_amount = 2000
replay = ProportionalReplay(capacity=cap, min_size=cap, batch_size=batch_size, alpha=1, beta=1)
s = int(np.sum(priors))
expected_priors = np.asarray(priors) / s
received_priors = [0] * len(priors)
for o, p in enumerate(priors):
replay.add(obs=o, action=0, reward=0, obs_next=0, term=False, priority=p)
for i in range(sample_amount):
obs, a, r, obs_next, terms, idxs, importance = replay.sample()
for o in obs:
received_priors[o] += 1
received_priors = np.asarray(received_priors) / (sample_amount*batch_size)
npt.assert_almost_equal(expected_priors, received_priors, decimal=2)
def test_prop_replay_update():
priors = np.array([20000.0, 30000.0, 1000.0, 49000.0, 0.0])
cap = 2048
batch_size = 32
sample_amount = 2000
replay = ProportionalReplay(capacity=cap, min_size=cap, batch_size=batch_size, alpha=1, beta=1)
s = int(np.sum(priors))
expected_priors = priors / s
received_priors = [0] * len(priors)
for o, p in enumerate(priors):
replay.add(obs=o, action=0, reward=0, obs_next=0, term=False)
replay.update(list(range(len(priors))), priors)
for i in range(sample_amount):
obs, a, r, obs_next, terms, idxs, importance = replay.sample()
for o in obs:
received_priors[o] += 1
received_priors = np.asarray(received_priors) / (sample_amount*batch_size)
npt.assert_almost_equal(expected_priors, received_priors, decimal=2)
def test_log_prod_students_t():
np.random.seed(1)
# Prior
D = 10
m_0 = 5*np.random.rand(D) - 2
k_0 = np.random.randint(15)
v_0 = D + np.random.randint(5)
S_0 = 2*np.random.rand(D) + 3
prior = NIW(m_0=m_0, k_0=k_0, v_0=v_0, S_0=S_0)
# GMM we will use to access `_log_prod_students_t`
x = 3*np.random.rand(D) + 4
gmm = GaussianComponentsDiag(np.array([x]), prior)
expected_prior = np.sum(
[students_t(x[i], m_0[i], S_0[i]*(k_0 + 1)/(k_0 * v_0), v_0) for i in range(len(x))]
)
npt.assert_almost_equal(gmm.log_prior(0), expected_prior)
def test_log_prod_norm():
np.random.seed(1)
# Prior
D = 10
var = 1*np.random.rand(D)
mu_0 = 5*np.random.rand(D) - 2
var_0 = 2*np.random.rand(D)
prior = FixedVarPrior(var, mu_0, var_0)
# GMM will be used to access `_log_prod_norm`
x = 3*np.random.rand(D) + 4
gmm = GaussianComponentsFixedVar(np.array([x]), prior)
expected_prior = np.sum([log_norm_pdf(x[i], mu_0[i], var_0[i]) for i in range(len(x))])
npt.assert_almost_equal(gmm.log_prior(0), expected_prior)
def test_log_post_pred():
np.random.seed(1)
# Generate data
X = np.random.rand(11, 10)
N, D = X.shape
# Prior
var = 1*np.random.rand(D)
mu_0 = 5*np.random.rand(D) - 2
var_0 = 2*np.random.rand(D)
prior = FixedVarPrior(var, mu_0, var_0)
# Setup GMM
assignments = [0, 0, 0, 1, 0, 1, 3, 4, 3, 2, -1]
gmm = GaussianComponentsFixedVar(X, prior, assignments=assignments)
expected_log_post_pred = log_post_pred_unvectorized(gmm, 10)
npt.assert_almost_equal(gmm.log_post_pred(10), expected_log_post_pred)
def test_log_prior_3d():
# Data
X = np.array([[-0.3406, -0.0593, -0.0686]])
N, D = X.shape
# Setup densities
m_0 = np.zeros(D)
k_0 = 0.05
v_0 = D + 1
S_0 = 0.001*np.eye(D)
prior = NIW(m_0, k_0, v_0, S_0)
gmm = GaussianComponents(X, prior)
# Calculate log predictave under prior alone
lp = gmm.log_prior(0)
lp_expected = -0.472067277015
npt.assert_almost_equal(lp, lp_expected)
def test_map():
# Setup densities
prior = NIW(m_0=np.array([0.0, 0.0]), k_0=2.0, v_0=5.0, S_0=5.0*np.eye(2))
gmm = GaussianComponents(np.array([
[1.2, 0.9],
[-0.1, 0.8]
]), prior)
gmm.add_item(0, 0)
gmm.add_item(1, 0)
mu_expected = np.array([0.275, 0.425])
sigma_expected = np.array([
[0.55886364, 0.04840909],
[0.04840909, 0.52068182]
])
# Calculate the posterior MAP of the parameters
mu, sigma = gmm.map(0)
npt.assert_almost_equal(mu, mu_expected)
npt.assert_almost_equal(sigma, sigma_expected)
def test_log_marg_k():
# Data
X = np.array([
[-0.3406, -0.3593, -0.0686],
[-0.3381, 0.2993, 0.925],
[-0.5, -0.101, 0.75]
])
N, D = X.shape
# Setup densities
m_0 = np.zeros(D)
k_0 = 0.05
v_0 = D + 3
S_0 = 0.5*np.eye(D)
prior = NIW(m_0, k_0, v_0, S_0)
gmm = GaussianComponents(X, prior, [0, 0, 0])
log_marg_expected = -8.42365141729
# Calculate log marginal of data
log_marg = gmm.log_marg_k(0)
npt.assert_almost_equal(log_marg, log_marg_expected)
def testIndexedSlicesGradIsClippedCorrectly(self):
sparse_grad_indices = np.array([0, 1, 4])
sparse_grad_dense_shape = [self._grad_vec.size]
values = tf.constant(self._grad_vec, dtype=tf.float32)
indices = tf.constant(sparse_grad_indices, dtype=tf.int32)
dense_shape = tf.constant(sparse_grad_dense_shape, dtype=tf.int32)
gradient = tf.IndexedSlices(values, indices, dense_shape)
variable = tf.Variable(self._zero_vec, dtype=tf.float32)
gradients_to_variables = (gradient, variable)
gradients_to_variables = slim.learning.clip_gradient_norms(
[gradients_to_variables], self._max_norm)[0]
# Ensure the built IndexedSlice has the right form.
self.assertEqual(gradients_to_variables[1], variable)
self.assertEqual(gradients_to_variables[0].indices, indices)
self.assertEqual(gradients_to_variables[0].dense_shape, dense_shape)
with tf.Session() as sess:
actual_gradient = sess.run(gradients_to_variables[0].values)
np_testing.assert_almost_equal(actual_gradient, self._clipped_grad_vec)
def testMultipleGradientsWithVariables(self):
gradient = tf.constant(self._grad_vec, dtype=tf.float32)
variable = tf.Variable(tf.zeros_like(gradient))
grad_to_var = (gradient, variable)
gradient_multipliers = {variable: self._multiplier}
[grad_to_var] = slim.learning.multiply_gradients(
[grad_to_var],
gradient_multipliers)
# Ensure the variable passed through.
self.assertEqual(grad_to_var[1], variable)
with self.test_session() as sess:
actual_gradient = sess.run(grad_to_var[0])
np_testing.assert_almost_equal(actual_gradient,
self._multiplied_grad_vec, 5)
def testIndexedSlicesGradIsMultiplied(self):
values = tf.constant(self._grad_vec, dtype=tf.float32)
indices = tf.constant([0, 1, 2], dtype=tf.int32)
dense_shape = tf.constant([self._grad_vec.size], dtype=tf.int32)
gradient = tf.IndexedSlices(values, indices, dense_shape)
variable = tf.Variable(tf.zeros((1, 3)))
grad_to_var = (gradient, variable)
gradient_multipliers = {variable: self._multiplier}
[grad_to_var] = slim.learning.multiply_gradients(
[grad_to_var],
gradient_multipliers)
# Ensure the built IndexedSlice has the right form.
self.assertEqual(grad_to_var[1], variable)
self.assertEqual(grad_to_var[0].indices, indices)
self.assertEqual(grad_to_var[0].dense_shape, dense_shape)
with self.test_session() as sess:
actual_gradient = sess.run(grad_to_var[0].values)
np_testing.assert_almost_equal(actual_gradient,
self._multiplied_grad_vec, 5)
def testMultipleGradientsWithVariables(self):
gradient = tf.constant(self._grad_vec, dtype=tf.float32)
variable = tf.Variable(tf.zeros_like(gradient))
grad_to_var = (gradient, variable)
gradient_multipliers = {variable: self._multiplier}
[grad_to_var] = slim.learning.multiply_gradients(
[grad_to_var],
gradient_multipliers)
# Ensure the variable passed through.
self.assertEqual(grad_to_var[1], variable)
with self.test_session() as sess:
actual_gradient = sess.run(grad_to_var[0])
np_testing.assert_almost_equal(actual_gradient,
self._multiplied_grad_vec, 5)
def testIndexedSlicesGradIsMultiplied(self):
values = tf.constant(self._grad_vec, dtype=tf.float32)
indices = tf.constant([0, 1, 2], dtype=tf.int32)
dense_shape = tf.constant([self._grad_vec.size], dtype=tf.int32)
gradient = tf.IndexedSlices(values, indices, dense_shape)
variable = tf.Variable(tf.zeros((1, 3)))
grad_to_var = (gradient, variable)
gradient_multipliers = {variable: self._multiplier}
[grad_to_var] = slim.learning.multiply_gradients(
[grad_to_var],
gradient_multipliers)
# Ensure the built IndexedSlice has the right form.
self.assertEqual(grad_to_var[1], variable)
self.assertEqual(grad_to_var[0].indices, indices)
self.assertEqual(grad_to_var[0].dense_shape, dense_shape)
with self.test_session() as sess:
actual_gradient = sess.run(grad_to_var[0].values)
np_testing.assert_almost_equal(actual_gradient,
self._multiplied_grad_vec, 5)
def isrot(rot, dtest=False):
"""
ISROT Test if SO(2) or SO(3) rotation matrix
ISROT(rot) is true if the argument if of dimension 2x2, 2x2xN, 3x3, or 3x3xN, else false (0).
ISROT(rot, 'valid') as above, but also checks the validity of the rotation.
See also ISHOMOG, ISROT2, ISVEC.
"""
if type(rot) is np.matrix:
rot = [rot]
if type(rot) is list:
for each in rot:
try:
assert type(each) is np.matrix
assert each.shape == (3, 3)
npt.assert_almost_equal(np.linalg.det(each), 1)
except AssertionError:
return False
return True
def test_gamma():
tsample = 0.005 / 1000
with pytest.raises(ValueError):
t, g = utils.gamma(0, 0.1, tsample)
with pytest.raises(ValueError):
t, g = utils.gamma(2, -0.1, tsample)
with pytest.raises(ValueError):
t, g = utils.gamma(2, 0.1, -tsample)
for tau in [0.001, 0.01, 0.1]:
for n in [1, 2, 5]:
t, g = utils.gamma(n, tau, tsample)
npt.assert_equal(np.arange(0, t[-1] + tsample / 2.0, tsample), t)
if n > 1:
npt.assert_equal(g[0], 0.0)
# Make sure area under the curve is normalized
npt.assert_almost_equal(np.trapz(np.abs(g), dx=tsample), 1.0,
decimal=2)
# Make sure peak sits correctly
npt.assert_almost_equal(g.argmax() * tsample, tau * (n - 1))
def test_pair_gradX_Y(self):
# sample
n = 11
d = 3
with util.NumpySeedContext(seed=20):
X = np.random.randn(n, d)*4
Y = np.random.randn(n, d)*2
k = kernel.KGauss(sigma2=2.1)
# n x d
pair_grad = k.pair_gradX_Y(X, Y)
loop_grad = np.zeros((n, d))
for i in range(n):
for j in range(d):
loop_grad[i, j] = k.gradX_Y(X[[i], :], Y[[i], :], j)
testing.assert_almost_equal(pair_grad, loop_grad)
def test_gradX_y(self):
n = 10
with util.NumpySeedContext(seed=10):
for d in [1, 3]:
y = np.random.randn(d)*2
X = np.random.rand(n, d)*3
sigma2 = 1.3
k = kernel.KGauss(sigma2=sigma2)
# n x d
G = k.gradX_y(X, y)
# check correctness
K = k.eval(X, y[np.newaxis, :])
myG = -K/sigma2*(X-y)
self.assertEqual(G.shape, myG.shape)
testing.assert_almost_equal(G, myG)
def test_gradXY_sum(self):
n = 11
with util.NumpySeedContext(seed=12):
for d in [3, 1]:
X = np.random.randn(n, d)
sigma2 = 1.4
k = kernel.KGauss(sigma2=sigma2)
# n x n
myG = np.zeros((n, n))
K = k.eval(X, X)
for i in range(n):
for j in range(n):
diffi2 = np.sum( (X[i, :] - X[j, :])**2 )
#myG[i, j] = -diffi2*K[i, j]/(sigma2**2)+ d*K[i, j]/sigma2
myG[i, j] = K[i, j]/sigma2*(d - diffi2/sigma2)
# check correctness
G = k.gradXY_sum(X, X)
self.assertEqual(G.shape, myG.shape)
testing.assert_almost_equal(G, myG)
def test_log_bf():
import numpy.testing as test
sep = numpy.array([0., 0.1, 0.2, 0.3, 0.4, 0.5])
for psi in sep:
print(psi)
print(' ', log_bf2(psi, 0.1, 0.2), )
print(' ', log_bf([[None, psi]], [0.1, 0.2]), )
test.assert_almost_equal(log_bf2(psi, 0.1, 0.2), log_bf([[None, psi]], [0.1, 0.2]))
for psi in sep:
print(psi)
bf3 = log_bf3(psi, psi, psi, 0.1, 0.2, 0.3)
print(' ', bf3)
g = log_bf([[None, psi, psi], [psi, None, psi], [psi, psi, None]], [0.1, 0.2, 0.3])
print(' ', g)
test.assert_almost_equal(bf3, g)
q = numpy.zeros(len(sep))
print(log_bf(numpy.array([[numpy.nan + sep, sep, sep], [sep, numpy.nan + sep, sep], [sep, sep, numpy.nan + sep]]),
[0.1 + q, 0.2 + q, 0.3 + q]))
def test_log_bf():
import numpy.testing as test
sep = numpy.array([0., 0.1, 0.2, 0.3, 0.4, 0.5])
for psi in sep:
print(psi)
print(' ', log_bf2(psi, 0.1, 0.2), )
print(' ', log_bf([[None, psi]], [0.1, 0.2]), )
test.assert_almost_equal(log_bf2(psi, 0.1, 0.2), log_bf([[None, psi]], [0.1, 0.2]))
for psi in sep:
print(psi)
bf3 = log_bf3(psi, psi, psi, 0.1, 0.2, 0.3)
print(' ', bf3)
g = log_bf([[None, psi, psi], [psi, None, psi], [psi, psi, None]], [0.1, 0.2, 0.3])
print(' ', g)
test.assert_almost_equal(bf3, g)
q = numpy.zeros(len(sep))
print(log_bf(numpy.array([[numpy.nan + sep, sep, sep], [sep, numpy.nan + sep, sep], [sep, sep, numpy.nan + sep]]),
[0.1 + q, 0.2 + q, 0.3 + q]))