def n_messages_chi_square(self, time_interval):
"""
Computes a chi square test against the null hypothesis that the number
of messages is uniformly distributed across the time interval. Only
makes sense for the time intervals 'minute in hour', 'minute in day',
'hour' since those ones have a fixed number of values.
Args:
time_interval: One of 'minute in hour', 'minute in day', 'hour'.
Returns:
chisq: A float representing the chi square statistic where the
observations consist of the number of messages in each value of
time_interval and the null hypothesis is that the number of
messages is uniformly distributed.
p: A float representing the p-value of the chi square test.
"""
valid_time_intervals = ['minute in hour', 'minute in day', 'hour']
if time_interval not in valid_time_intervals:
raise ValueError('time_interval must be in {}'.format(valid_time_intervals))
result = chisquare(self.get_n_messages_in_time_interval(time_interval))
return (result.statistic, result.pvalue)
python类chisquare()的实例源码
def test_univariate_categorical():
# This test generates univariate data from a nominal variable with 6 levels
# and probability vector p_theory, and performs a chi-square test on
# posterior samples from MvKde.
rng = gu.gen_rng(2)
N_SAMPLES = 1000
p_theory = [.3, .1, .2, .15, .15, .1]
samples_test = rng.choice(range(6), p=p_theory, size=N_SAMPLES)
kde = MultivariateKde(
[7], None, distargs={O: {ST: [C], SA:[{'k': 6}]}}, rng=rng)
# Incorporate observations.
for rowid, x in enumerate(samples_test):
kde.incorporate(rowid, {7: x})
kde.transition()
# Posterior samples.
samples_gen = kde.simulate(-1, [7], N=N_SAMPLES)
f_obs = np.bincount([s[7] for s in samples_gen])
f_exp = np.bincount(samples_test)
_, pval = chisquare(f_obs, f_exp)
assert 0.05 < pval
# Get some coverage on logpdf_score.
assert kde.logpdf_score() < 0
def chisquare(n_ij, weighted):
"""
Calculates the chisquare for a matrix of ind_v x dep_v
for the unweighted and SPSS weighted case
"""
if weighted:
m_ij = n_ij / n_ij
nan_mask = np.isnan(m_ij)
m_ij[nan_mask] = 0.000001 # otherwise it breaks the chi-squared test
w_ij = m_ij
n_ij_col_sum = n_ij.sum(axis=1)
n_ij_row_sum = n_ij.sum(axis=0)
alpha, beta, eps = (1, 1, 1)
while eps > 10e-6:
alpha = alpha * np.vstack(n_ij_col_sum / m_ij.sum(axis=1))
beta = n_ij_row_sum / (alpha * w_ij).sum(axis=0)
eps = np.max(np.absolute(w_ij * alpha * beta - m_ij))
m_ij = w_ij * alpha * beta
else:
m_ij = (np.vstack(n_ij.sum(axis=1)) * n_ij.sum(axis=0)) / n_ij.sum().astype(float)
dof = (n_ij.shape[0] - 1) * (n_ij.shape[1] - 1)
chi, p_val = stats.chisquare(n_ij, f_exp=m_ij, ddof=n_ij.size - 1 - dof, axis=None)
return (chi, p_val, dof)
def test(self):
nr_observations = sum(self.histogram)
observed_frequencies = []
expected_frequencies = []
frequencies_of = []
thresh = 10
for i in range(0, len(self.histogram)):
observed = self.histogram[i]
expected = stats.poisson.pmf(i, self.lambda_) * nr_observations
if (
(observed >= thresh)
and (expected >= thresh)):
observed_frequencies.append(observed)
expected_frequencies.append(expected)
frequencies_of.append(i)
results = stats.chisquare(observed_frequencies,
expected_frequencies)
print("expected: mean %f variance %f" % (
self.expected_mean(),
self.expected_variance()))
print("actual: mean %f variance %f" % (
self.mean(),
self.variance()))
print(len(expected_frequencies))
print(results)
from matplotlib import pyplot
import matplotlib
pyplot.switch_backend('Qt5Agg')
actual_plot, = pyplot.plot(frequencies_of, observed_frequencies, label='actual')
expected_plot, = pyplot.plot(frequencies_of, expected_frequencies, 'r', linewidth=1, label='expected')
matplotlib.interactive(True)
#pyplot.ylabel("People at Table")
#pyplot.xlabel("Table Number")
#pyplot.title("Chinese Restaurant Process Unit Test")
pyplot.legend()
pyplot.show(block=True)
return results
def two_sample_test(cctype, X, Y):
model = cu.cctype_class(cctype)
if model.is_numeric(): # XXX WRONG CHOICE FOR DISCRETE NUMERIC XXX
_, pval = ks_2samp(X, Y)
else:
Xb, Yb = aligned_bincount([X, Y])
ignore = np.logical_and(Xb==0, Yb==0)
Xb, Yb = Xb[np.logical_not(ignore)], Yb[np.logical_not(ignore)]
Xb = Xb/float(sum(Xb)) * 1000
Yb = Yb/float(sum(Yb)) * 1000
_, pval = chisquare(Yb, f_exp=Xb)
return pval
def best_cat_split(self, ind, dep):
""" detrmine best categorical variable split """
split = Split(None, None, None, None, 0)
all_dep = np.unique(dep.arr)
for i, ind_var in enumerate(ind):
ind_var = ind_var.deep_copy()
unique = np.unique(ind_var.arr)
freq = {}
if dep.weights is None:
for col in unique:
counts = np.unique(np.compress(ind_var.arr == col, dep.arr), return_counts=True)
freq[col] = cl.defaultdict(int)
freq[col].update(np.transpose(counts))
else:
for col in unique:
counts = np.unique(np.compress(ind_var.arr == col, dep.arr), return_counts=True)
freq[col] = cl.defaultdict(int)
for dep_v in all_dep:
freq[col][dep_v] = dep.weights[(ind_var.arr == col) * (dep.arr == dep_v)].sum()
if len(list(ind_var.possible_groupings())) == 0:
split.invalid_reason = InvalidSplitReason.PURE_NODE
choice, highest_p_join, split_chi, dof = None, None, None, None
for comb in ind_var.all_combinations():
freqs = [ sum( [ cl.Counter(freq[key]) for key in c ], cl.Counter()) for c in comb ]
keys = set(sum([ list(f.keys()) for f in freqs ], []))
n_ij = np.array(
[ [ col.get(k, 0) for k in keys ] for col in freqs ]
)
chi, p_split, dof = chisquare(n_ij, dep.weights is not None)
if (choice is None or p_split < highest_p_join or (p_split == highest_p_join and chi > split_chi)) and (n_ij.sum(axis=1) >= self.min_child_node_size).all() and p_split < self.alpha_merge:
choice, highest_p_join, split_chi = comb, p_split, chi
temp_split = Split(i, choice, split_chi, highest_p_join, dof, split_name=ind_var.name)
better_split = (not split.valid() or p_split < split.p or (p_split == split.p and chi > split.score)) and choice is not None
if better_split: split, temp_split = temp_split, split
if split.valid() and choice is not None:
chi_threshold = self.split_threshold * split.score
if temp_split.valid() and temp_split.score >= chi_threshold:
for sur in temp_split.surrogates:
if sur.column_id != i and sur.score >= chi_threshold:
split.surrogates.append(sur)
temp_split.surrogates = []
split.surrogates.append(temp_split)
split.sub_split_values(ind[split.column_id].metadata)
return split