def test_create_ai_resistance_chan(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
ai_phys_chan = random.choice(x_series_device.ai_physical_chans).name
with nidaqmx.Task() as task:
ai_channel = task.ai_channels.add_ai_resistance_chan(
ai_phys_chan, name_to_assign_to_channel="ResistanceChannel",
min_val=-1000.0, max_val=1000.0, units=ResistanceUnits.OHMS,
resistance_config=ResistanceConfiguration.TWO_WIRE,
current_excit_source=ExcitationSource.EXTERNAL,
current_excit_val=0.002, custom_scale_name="")
assert ai_channel.physical_channel.name == ai_phys_chan
assert ai_channel.name == "ResistanceChannel"
assert numpy.isclose(ai_channel.ai_min, -1000.0, atol=1)
assert numpy.isclose(ai_channel.ai_max, 1000.0, atol=1)
assert ai_channel.ai_resistance_units == ResistanceUnits.OHMS
assert (ai_channel.ai_resistance_cfg ==
ResistanceConfiguration.TWO_WIRE)
assert ai_channel.ai_excit_src == ExcitationSource.EXTERNAL
assert ai_channel.ai_excit_val == 0.002
python类isclose()的实例源码
def calc_norm_lp_div_scores(
log_prob_scores: List[float],
unigram_scores: List[float]) -> List[Union[None, float]]:
r"""
.. math:
\frac{%
\log P_\text{model}\left(\xi\right)
}{%
\log P_\text{unigram}\left(\xi\right)
}
>>> '{:.3f}'.format(calc_norm_lp_div_scores([-14.7579], [-35.6325])[0])
'-0.414'
"""
results = []
for log_prob, unigram_score in zip(log_prob_scores, unigram_scores):
if log_prob is None or numpy.isclose(unigram_score, 0.0, rtol=1e-05):
x = None
else:
x = (-1.0) * float(log_prob) / float(unigram_score)
results.append(x)
return results
def calc_norm_lp_sub_scores(
log_prob_scores: List[float],
unigram_scores: List[float]) -> List[Union[None, float]]:
r"""
.. math:
\log P_\text{model}\left(\xi\right)
- \log P_\text{unigram}\left(\xi\right)
>>> '{:.3f}'.format(calc_norm_lp_sub_scores([-14.7579], [-35.6325])[0])
'20.875'
"""
results = []
for log_prob, unigram_score in zip(log_prob_scores, unigram_scores):
if log_prob is None or numpy.isclose(unigram_score, 0.0, rtol=1e-05):
x = None
else:
x = float(log_prob) - float(unigram_score)
results.append(x)
return results
def reshapeWeights(self, weights, normalize=True, modifier=None):
# reshape the weights matrix to a grid for visualization
n_rows = int(np.sqrt(weights.shape[1]))
n_cols = int(np.sqrt(weights.shape[1]))
kernel_size = int(np.sqrt(weights.shape[0]/3))
weights_grid = np.zeros((int((np.sqrt(weights.shape[0]/3)+1)*n_rows), int((np.sqrt(weights.shape[0]/3)+1)*n_cols), 3), dtype=np.float32)
for i in range(weights_grid.shape[0]/(kernel_size+1)):
for j in range(weights_grid.shape[1]/(kernel_size+1)):
index = i * (weights_grid.shape[0]/(kernel_size+1))+j
if not np.isclose(np.sum(weights[:, index]), 0):
if normalize:
weights_grid[i * (kernel_size + 1):i * (kernel_size + 1) + kernel_size, j * (kernel_size + 1):j * (kernel_size + 1) + kernel_size]=\
(weights[:, index].reshape(kernel_size, kernel_size, 3) - np.min(weights[:, index])) / ((np.max(weights[:, index]) - np.min(weights[:, index])) + 1.e-6)
else:
weights_grid[i * (kernel_size + 1):i * (kernel_size + 1) + kernel_size, j * (kernel_size + 1):j * (kernel_size + 1) + kernel_size] =\
(weights[:, index].reshape(kernel_size, kernel_size, 3))
if modifier is not None:
weights_grid[i * (kernel_size + 1):i * (kernel_size + 1) + kernel_size, j * (kernel_size + 1):j * (kernel_size + 1) + kernel_size] *= modifier[index]
return weights_grid
def ani_update(arg, ii=[0]):
i = ii[0] # don't ask...
if np.isclose(t_arr[i], np.around(t_arr[i], 1)):
fig2.suptitle('Evolution (Time: {})'.format(t_arr[i]), fontsize=24)
graphic_floor[0].set_data([-floor_lim*np.cos(incline_history[i]) + radius*np.sin(incline_history[i]), floor_lim*np.cos(incline_history[i]) + radius*np.sin(incline_history[i])], [-floor_lim*np.sin(incline_history[i])-radius*np.cos(incline_history[i]), floor_lim*np.sin(incline_history[i])-radius*np.cos(incline_history[i])])
graphic_wheel.center = (x_history[i], y_history[i])
graphic_ind[0].set_data([x_history[i], x_history[i] + radius*np.sin(w_history[i])],
[y_history[i], y_history[i] + radius*np.cos(w_history[i])])
graphic_pend[0].set_data([x_history[i], x_history[i] - cw_to_cm[1]*np.sin(q_history[i, 2])],
[y_history[i], y_history[i] + cw_to_cm[1]*np.cos(q_history[i, 2])])
graphic_dist[0].set_data([x_history[i] - cw_to_cm[1]*np.sin(q_history[i, 2]), x_history[i] - cw_to_cm[1]*np.sin(q_history[i, 2]) - pscale*p_history[i]*np.cos(q_history[i, 2])],
[y_history[i] + cw_to_cm[1]*np.cos(q_history[i, 2]), y_history[i] + cw_to_cm[1]*np.cos(q_history[i, 2]) - pscale*p_history[i]*np.sin(q_history[i, 2])])
ii[0] += int(1 / (timestep * framerate))
if ii[0] >= len(t_arr):
print("Resetting animation!")
ii[0] = 0
return [graphic_floor, graphic_wheel, graphic_ind, graphic_pend, graphic_dist]
# Run animation
TDM_NUPACK_tests.py 文件源码
项目:piperine
作者: DNA-and-Natural-Algorithms-Group
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def test_NUPACKIntScore(self, tmpdir=tmpdir):
seq_names = ['hairpin1', 'hairpin2']
seq_values = ["TTTTTTTTTTCCCAAAAAAAAAA",
"AAAAAAAAAAGGGTTTTTTTTTT"]
seq_dict = dict(zip(seq_names, seq_values))
true_score = 100 * sum([7.289922e-13, 1.146297e-14, 1.156180e-12]) / 2e-6
out = tdm.NUPACKIntScore(
seq_names[0],
seq_names[1],
seq_dict,
clean=clean,
quiet=True,
tmpdir=tmpdir
)
self.assertTrue(np.isclose(out, true_score, rtol=1e-6),
msg="true:{}\ntest:{}\n".format(true_score, out))
TDM_NUPACK_tests.py 文件源码
项目:piperine
作者: DNA-and-Natural-Algorithms-Group
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def test_NUPACKSSScore(self, tmpdir=tmpdir):
strand = 'r1-d'
t_regi = self.h_inputs[3][strand]
true_sstu = 0.9028 #0.9059
true_sstu_sum = 9.5695 # 9.5847
true_ssu = 0.8524 # 0.8579
true_ssu_sum = 42.0609 # 42.1328
trues = [true_ssu, true_ssu_sum,
true_sstu, true_sstu_sum, 44]
score_names = ["min_Unpaired", "sum_Unpaired", "min_Unpaired_toe", \
"sum_Unpaired_toe", "seq_len"]
out = tdm.NUPACKSSScore(strand,
self.sequence_dicts[0],
toe_region=t_regi,
clean=clean,
tmpdir=tmpdir)
for i in range(4):
self.assertTrue(np.isclose(out[i], trues[i]),
msg="Score {} Test:{} True:{}".format(score_names[i], out[i], trues[i]))
def test_bootstrap_replicate_1d(data, seed):
np.random.seed(seed)
x = dcst.bootstrap_replicate_1d(data, np.mean)
np.random.seed(seed)
x_correct = original.bootstrap_replicate_1d(data[~np.isnan(data)], np.mean)
assert (np.isnan(x) and np.isnan(x_correct, atol=atol, equal_nan=True)) \
or np.isclose(x, x_correct, atol=atol, equal_nan=True)
np.random.seed(seed)
x = dcst.bootstrap_replicate_1d(data, np.median)
np.random.seed(seed)
x_correct = original.bootstrap_replicate_1d(data[~np.isnan(data)], np.median)
assert (np.isnan(x) and np.isnan(x_correct, atol=atol, equal_nan=True)) \
or np.isclose(x, x_correct, atol=atol, equal_nan=True)
np.random.seed(seed)
x = dcst.bootstrap_replicate_1d(data, np.std)
np.random.seed(seed)
x_correct = original.bootstrap_replicate_1d(data[~np.isnan(data)], np.std)
assert (np.isnan(x) and np.isnan(x_correct, atol=atol, equal_nan=True)) \
or np.isclose(x, x_correct, atol=atol, equal_nan=True)
def lp_fir_type(h):
"""
Determine if FIR filter impulse response *h* is symmetric or
antisymmetric. Return {1, 2, 3, 4} depending on FIR filter type or
None if the FIR filter is not linear phase.
"""
M = len(h) - 1
n_range = range(M + 1)
if M % 2 == 0:
if all([NP.isclose(h[n], h[M - n]) for n in n_range]):
return 1
elif all([NP.isclose(h[n], -h[M - n]) for n in n_range]):
return 3
else:
return None
else:
if all([NP.isclose(h[n], h[M - n]) for n in n_range]):
return 2
elif all([NP.isclose(h[n], -h[M - n]) for n in n_range]):
return 4
else:
return None
assert False
def lp_fir_type(h):
"""
Determine if FIR filter impulse response *h* is symmetric or
antisymmetric. Return {1, 2, 3, 4} depending on FIR filter type or
None if the FIR filter is not linear phase.
"""
M = len(h) - 1
n_range = range(M + 1)
if M % 2 == 0:
if all([NP.isclose(h[n], h[M - n]) for n in n_range]):
return 1
elif all([NP.isclose(h[n], -h[M - n]) for n in n_range]):
return 3
else:
return None
else:
if all([NP.isclose(h[n], h[M - n]) for n in n_range]):
return 2
elif all([NP.isclose(h[n], -h[M - n]) for n in n_range]):
return 4
else:
return None
assert False
def view_live(live, lang='EN'):
df_head = pd.DataFrame({'Cover': ['<img src="{0}" width=100 />'.format(live.cover)]})
df_head['Song Name'] = '<p style="color:{0};">{1}</p>'.format(attr_color[live.attr], live.name)
df_head['Group'] = live.group
df_head['Difficulty'] = live.difficulty
df_head['Total Note'] = live.note_number
df_head['Duration'] = live.duration
df_head.columns = ['<p>{0}</p>'.format(x) for x in list(df_head.columns)]
df = live.summary.copy()
pos_name = ['<p>{0}</p>'.format(x) for x in ['L1', 'L2', 'L3', 'L4', 'C', 'R4', 'R3', 'R2', 'R1']]
df.index = [pos_name[9-x] if type(x)==int else x for x in list(df.index)]
df = df.loc[pos_name+['total']]
df = df.applymap(lambda x: '<p>{0}</p>'.format(str(int(x)) if np.isclose(2*x,round(2*x)) else '{0:.3f}'.format(x))).transpose()
if lang=='CN':
df_head.columns = ['<p>{0}</p>'.format(x) for x in ['????', '????', '??', '??', 'Note??', '??']]
df.columns = list(df.columns)[:-1] + ['<p>??</p>']
df.index = ['<p>{0}</p>'.format(x) for x in ['??', '??', '??', '??', '??', '????', '????', '??????']]
elif lang=='EN':
df.index = ['<p>{0}</p>'.format(x) for x in ['tap', 'hold', 'swing', 'star', 'token', 'type weight', 'combo weight', 'weight fraction']]
return HTML(html_template.format(df_head.to_html(escape=False, index=False) + df.to_html(escape=False)))
def group_data(data, grouping):
"""
Group the data with respect to the supplied `grouping' specification
(i.e., "GROUPING" columns of a spectrum). The channel counts of the
same group are summed up and assigned to the FIRST channel of this
group, while the OTHRE channels are all set to ZERO.
"""
data_grp = np.array(data)
for i in reversed(range(len(data))):
if grouping[i] == 1:
# the beginning channel of a group
continue
else:
# other channels of a group
data_grp[i-1] += data_grp[i]
data_grp[i] = 0
assert np.isclose(sum(data_grp), sum(data))
return data_grp
def test_sentiment_comprehensive(self):
sentiment = 0.0
for t in self.mock_tweets:
if t['sentiment']!=0:
sentiment = 0.99*sentiment + 0.01*t['sentiment']
# calc = 0*0.99**2 + 0.01*0.99*-0.7531 + 0.01*-0.5719
# = -0.01299649
self.mock_feels._latest_calc = self.mock_feels._feels.start
self.assertTrue(np.isclose(self.mock_feels.sentiment.value, sentiment))
# first observation is at 2017-2-19 19:14:18 and we are using default
# 60 second bins, therefore the observation at 2017-2-21 19:14:20 will
# never get saved but will always be recalculated.
self.assertEqual(self.mock_feels._latest_calc,
datetime(2017, 2, 21, 19, 14, 0))
# repeat the calculation, nothing changes
self.assertTrue(np.isclose(self.mock_feels.sentiment.value, sentiment))
self.assertEqual(self.mock_feels._latest_calc,
datetime(2017, 2, 21, 19, 14, 0))
self.assertEqual(self.mock_feels.sentiment.start,
self.mock_feels._latest_calc)
def face_index(vertices):
"""Takes an MxNx3 array and returns a 2D vertices and MxN face_indices arrays"""
new_verts = []
face_indices = []
for wall in vertices:
face_wall = []
for vert in wall:
if new_verts:
if not np.isclose(vert, new_verts).all(axis=1).any():
new_verts.append(vert)
else:
new_verts.append(vert)
face_index = np.where(np.isclose(vert, new_verts).all(axis=1))[0][0]
face_wall.append(face_index)
face_indices.append(face_wall)
return np.array(new_verts), np.array(face_indices)
def test_E_1(self):
"""Tests the electric field 1."""
for E in [self.charge1a.E, self.charge1b.E]:
self.assertTrue(isclose(E([0, 1]), [0, sqrt(2)]).all())
self.assertTrue(isclose(E([0, -1]), [0, -sqrt(2)]).all())
self.assertTrue(isclose(E([1, 2]),
array([1-1/sqrt(2), 1/sqrt(2)])/2).all())
self.assertTrue(isclose(E([1, -2]),
array([1-1/sqrt(2), -1/sqrt(2)])/2).all())
self.assertTrue(isclose(E([-1, 2]),
array([-1+1/sqrt(2), 1/sqrt(2)])/2).all())
self.assertTrue(isclose(E([-1, -2]),
array([-1+1/sqrt(2), -1/sqrt(2)])/2).all())
self.assertTrue(isclose(E([2, 0]), [2/3, 0]).all())
self.assertTrue(isclose(E([-2, 0]), [-2/3, 0]).all())
self.assertTrue(isclose(E([[0, 1], [0, -1], [1, 2]]),
[[0, sqrt(2)],
[0, -sqrt(2)],
array([1-1/sqrt(2), 1/sqrt(2)])/2]).all())
def test_E_2(self):
"""Tests the electric field 2."""
E = self.charge2.E
self.assertTrue(isclose(E([1, 0]), [3*sqrt(2), 0]).all())
self.assertTrue(isclose(E([-1, 0]), [-3*sqrt(2), 0]).all())
self.assertTrue(isclose(E([2, 1]),
array([1/sqrt(2), 1-1/sqrt(2)])*1.5).all())
self.assertTrue(isclose(E([-2, 1]),
array([-1/sqrt(2), 1-1/sqrt(2)])*1.5).all())
self.assertTrue(isclose(E([2, -1]),
array([1/sqrt(2), -1+1/sqrt(2)])*1.5).all())
self.assertTrue(isclose(E([-2, -1]),
array([-1/sqrt(2), -1+1/sqrt(2)])*1.5).all())
self.assertTrue(isclose(E([0, 2]), [0, 2]).all())
self.assertTrue(isclose(E([0, -2]), [0, -2]).all())
def test_projection(self):
"""Tests the electric field projection."""
projection = self.field.projection
# Top-right quadrant
a = radians(45)
self.assertTrue(isclose(projection([0, 0], a), -4*cos(a)))
self.assertTrue(isclose(projection([3, 0], a), 0.375*cos(a)))
self.assertTrue(isclose(projection([0, 1], a), -sqrt(2)*cos(a)))
self.assertTrue(isclose(projection([[0, 0], [3, 0], [0, 1]], a),
array([-4, 0.375, -sqrt(2)])*cos(a)).all())
# Bottom-left quadrant
a1 = radians(-135)
a2 = radians(45)
self.assertTrue(isclose(projection([0, 0], a1), 4*cos(a2)))
self.assertTrue(isclose(projection([3, 0], a1), -0.375*cos(a2)))
self.assertTrue(isclose(projection([0, 1], a1),
sqrt(2)*cos(a2)))
self.assertTrue(isclose(projection([[0, 0], [3, 0], [0, 1]], a1),
array([4, -0.375, sqrt(2)])*cos(a2)).all())
def test_dipole_fluxpoints(self):
"""Tests dipole flux points."""
field = ElectricField([PointCharge(-2, [0, 0]), PointCharge(2, [2, 0])])
circle = GaussianCircle([0, 0], 0.1)
fluxpoints = circle.fluxpoints(field, 4)
self.assertEqual(len(fluxpoints), 4)
fluxpoints = circle.fluxpoints(field, 14)
self.assertEqual(len(fluxpoints), 14)
self.assertTrue(isclose(fluxpoints[0], [0.1, 0]).all())
self.assertTrue(isclose(fluxpoints[7], [-0.1, 0]).all())
x1 = fluxpoints[1:7]
x2 = fluxpoints[-1:7:-1]
x2[:, 1] = fabs(x2[:, 1])
self.assertTrue(isclose(x1, x2).all())
#-----------------------------------------------------------------------------
# main()
def test_ISC():
# Create dataset in which one voxel is highly correlated across subjects
# and the other is not
D = np.zeros((2, 5, 3))
D[:, :, 0] = \
[[-0.36225433, -0.43482456, 0.26723158, 0.16461712, -0.37991465],
[-0.62305959, -0.46660116, -0.50037994, 1.81083754, 0.23499509]]
D[:, :, 1] = \
[[-0.30484153, -0.49486988, 0.10966625, -0.19568572, -0.20535156],
[1.68267639, -0.78433298, -0.35875085, -0.6121344, 0.28603493]]
D[:, :, 2] = \
[[-0.36593192, -0.50914734, 0.21397317, 0.30276589, -0.42637472],
[0.04127293, -0.67598379, -0.51549055, -0.64196342, 1.60686666]]
(ISC, p) = brainiak.isfc.isc(D, return_p=True, num_perm=100,
two_sided=True, random_state=0)
assert np.isclose(ISC, [0.8909243, 0.0267954]).all(), \
"Calculated ISC does not match ground truth"
assert np.isclose(p, [0.02, 1]).all(), \
"Calculated p values do not match ground truth"
def test_fit_shapes():
K = 5
V = 3
T = 10
es = brainiak.eventseg.event.EventSegment(K, n_iter=2)
sample_data = np.random.rand(V, T)
es.fit(sample_data.T)
assert es.segments_[0].shape == (T, K), "Segmentation from fit " \
"has incorrect shape"
assert np.isclose(np.sum(es.segments_[0], axis=1), np.ones(T)).all(), \
"Segmentation from learn_events not correctly normalized"
T2 = 15
sample_data2 = np.random.rand(V, T2)
test_segments, test_ll = es.find_events(sample_data2.T)
assert test_segments.shape == (T2, K), "Segmentation from find_events " \
"has incorrect shape"
assert np.isclose(np.sum(test_segments, axis=1), np.ones(T2)).all(), \
"Segmentation from find_events not correctly normalized"
def _receiver_validator(weights, rec_wcounts, cat_wcounts):
"""
Validate the receiver weights, and make sure it sums to
category window counts.
:param weights:
:param rec_wcounts:
:param cat_wcounts:
:return:
"""
wsum = 0
for chan, chan_weight in weights.iteritems():
nwin = rec_wcounts[chan]
wsum += chan_weight * nwin
print("Summation of (rec_weights * rec_nwins): %.2f" % wsum)
if not np.isclose(wsum, cat_wcounts):
raise ValueError("receiver validator fails: %f, %f" %
(wsum, cat_wcounts))
def _category_validator(weights, wcounts):
"""
Validate the category weights
:param weights:
:param counts:
:return:
"""
wsum = 0.0
nwins_total = 0
for p, pinfo in weights.iteritems():
for c in pinfo:
wsum += weights[p][c] * wcounts[p][c]
nwins_total += wcounts[p][c]
print("Summation of (cat_weight * cat_nwins): %.2f" % wsum)
if not np.isclose(wsum, nwins_total):
raise ValueError("Category validator fails: %f, %f" %
(wsum, nwins_total))
def test_calculate_baz():
elat = 0.0
elon = 0.0
slat = 10.0
# put a very small value here
slon = 0.000000001
npt.assert_allclose(rotate.calculate_baz(elat, elon, slat, slon),
180.0)
assert np.isclose(rotate.calculate_baz(slat, slon, elat, elon), 0.0)
elat = 0.0
elon = 0.0
slat = 0.0
slon = 10.0
npt.assert_allclose(rotate.calculate_baz(elat, elon, slat, slon),
270.0)
npt.assert_allclose(rotate.calculate_baz(slat, slon, elat, elon),
90.0)
def test_ensemble_synthetic_channel_orientation():
dip, azi = rotate.ensemble_synthetic_channel_orientation("MXZ")
assert np.isclose(dip, 90.0)
assert np.isclose(azi, 0.0)
dip, azi = rotate.ensemble_synthetic_channel_orientation("MXN")
assert np.isclose(dip, 0.0)
assert np.isclose(azi, 0.0)
dip, azi = rotate.ensemble_synthetic_channel_orientation("MXE")
assert np.isclose(dip, 0.0)
assert np.isclose(azi, 90.0)
with pytest.raises(Exception) as err:
rotate.ensemble_synthetic_channel_orientation("MXR")
assert ": MXR" in str(err)
def check_vertical_inventory_sanity(tr, inventory):
"""
Check the inventory of vertical(Z) component, check
if the abs(dip) is 90 and azimuth is 0.
"""
if tr.stats.channel[-1] != "Z":
raise ValueError("Function only checks vertical(Z) component(%s)"
% tr.stats.channel)
dip, azi = extract_channel_orientation(tr, inventory)
if dip is None or azi is None:
return False
if np.isclose(abs(dip), 90.0) and np.isclose(abs(azi), 0.0):
return True
else:
return False
def normalize_source_weights(points, wcounts):
wsum = 0.0
wcounts_sum = 0
for p in points:
wsum += p.weight * wcounts[p.tag]
wcounts_sum += wcounts[p.tag]
print("The summation of window counts: %d" % wcounts_sum)
print("The iniital summation(weight * window_counts): %f" % wsum)
factor = 1.0 / wsum
weights = {}
for p in points:
weights[p.tag] = p.weight * factor
# validate
wsum = 0.0
for event in weights:
wsum += wcounts[event] * weights[event]
if not np.isclose(wsum, 1.0):
raise ValueError("Error normalize source weights: %f" % wsum)
print("The normalized sum is: %f" % wsum)
print("Final weights: %s" % weights)
return weights
def satisfied(self, x):
# Define a mask for each row in x
mask = np.ones(len(x), dtype=bool)
# Test that all of the conditions pass
for condition in self.conditions:
if condition['type'] == CONTINUOUS:
comparison = self.comparator[condition['operator']]
x_column = x[:, condition['operand_index']]
operand = condition['operand']
mask &= comparison(x_column, operand)
elif condition['type'] == CATEGORICAL:
allowed = condition['values']
x_column = x[:, [condition['operand_index']]]
mask &= np.isclose(allowed, x_column).any(axis=1)
# If all of the conditions passed for a single row, we can conclude
# that this row satisfies this rule
return mask
def test_correlations_3d(self):
data = np.random.multivariate_normal([0, 0, 1], [[1, 0.5, 0.2], [0.5, 1, 0.3], [0.2, 0.3, 1.0]], size=100000)
parameters = ["x", "y", "z"]
c = ChainConsumer()
c.add_chain(data, parameters=parameters, name="chain1")
p, cor = c.analysis.get_correlations(chain="chain1", parameters=["y", "z", "x"])
assert p[0] == "y"
assert p[1] == "z"
assert p[2] == "x"
assert np.isclose(cor[0, 0], 1, atol=1e-2)
assert np.isclose(cor[1, 1], 1, atol=1e-2)
assert np.isclose(cor[2, 2], 1, atol=1e-2)
assert cor.shape == (3, 3)
assert np.abs(cor[0, 1] - 0.3) < 0.01
assert np.abs(cor[0, 2] - 0.5) < 0.01
assert np.abs(cor[1, 2] - 0.2) < 0.01
def test_covariance_3d(self):
cov = [[3, 0.5, 0.2], [0.5, 4, 0.3], [0.2, 0.3, 5]]
data = np.random.multivariate_normal([0, 0, 1], cov, size=2000000)
parameters = ["x", "y", "z"]
c = ChainConsumer()
c.add_chain(data, parameters=parameters, name="chain1")
p, cor = c.analysis.get_covariance(chain="chain1", parameters=["y", "z", "x"])
assert p[0] == "y"
assert p[1] == "z"
assert p[2] == "x"
assert np.isclose(cor[0, 0], 4, atol=2e-2)
assert np.isclose(cor[1, 1], 5, atol=2e-2)
assert np.isclose(cor[2, 2], 3, atol=2e-2)
assert cor.shape == (3, 3)
assert np.abs(cor[0, 1] - 0.3) < 0.01
assert np.abs(cor[0, 2] - 0.5) < 0.01
assert np.abs(cor[1, 2] - 0.2) < 0.01
def test_summary_max_symmetric_2(self):
c = ChainConsumer()
c.add_chain(self.data_skew)
summary_area = 0.6827
c.configure(statistics="max_symmetric", bins=1.0, summary_area=summary_area)
summary = c.analysis.get_summary()['0']
xs = np.linspace(0, 2, 1000)
pdf = skewnorm.pdf(xs, 5, 1, 1.5)
xmax = xs[pdf.argmax()]
cdf_top = skewnorm.cdf(summary[2], 5, 1, 1.5)
cdf_bottom = skewnorm.cdf(summary[0], 5, 1, 1.5)
area = cdf_top - cdf_bottom
assert np.isclose(xmax, summary[1], atol=0.05)
assert np.isclose(area, summary_area, atol=0.05)
assert np.isclose(summary[2] - summary[1], summary[1] - summary[0])