def test(self, xtest, x, type_='smaller', alpha=0.05):
"""
Parameters
----------
type_: in ['smaller', 'equality']
type of comparison to perform
alpha: float
significance level
"""
# call function to make sure it has been evaluated a sufficient number of times
if type_ not in ['smaller', 'equality']:
raise NotImplementedError(type_)
ftest, ftestse = self(xtest)
f, fse = self(x)
# get function values
fxtest = np.array(self.cache[tuple(xtest)])
fx = np.array(self.cache[tuple(x)])
if np.mean(fxtest-fx) == 0.0:
if type_ == 'equality':
return True
if type_ == 'smaller':
return False
if self.paired:
# if values are paired then test on distribution of differences
statistic, pvalue = stats.ttest_rel(fxtest, fx, axis=None)
else:
statistic, pvalue = stats.ttest_ind(fxtest, fx, equal_var=False, axis=None)
if type_ == 'smaller':
# if paired then df=N-1, else df=N1+N2-2=2*N-2
df = self.N-1 if self.paired else 2*self.N-2
pvalue = stats.t.cdf(statistic, df)
# return true if null hypothesis rejected
return pvalue < alpha
if type_ == 'equality':
# return true if null hypothesis not rejected
return pvalue > alpha
python类ttest_rel()的实例源码
def ttest(filename1, filename2):
qids1, values1 = load_evaluation_file(arguments.filename1)
qids2, values2 = load_evaluation_file(arguments.filename2)
if qids1.shape[0] != qids2.shape[0]:
raise ValueError('number of queries in files do not match (%d != %d)'\
% (qids1.shape[0], qids2.shape[0]))
qids1_sort_idxs = np.argsort(qids1)
qids2_sort_idxs = np.argsort(qids2)
qids1 = qids1[qids1_sort_idxs]
qids2 = qids2[qids2_sort_idxs]
if np.any(qids1 != qids2):
raise ValueError('files do not contain the same queries')
values1 = values1[qids1_sort_idxs]
values2 = values2[qids2_sort_idxs]
mean1 = np.mean(values1)
mean2 = np.mean(values2)
t_statistic, p_value = ttest_rel(values1, values2)
return values1.shape[0], mean1, mean2, t_statistic, p_value
def ttest(list1, list2):
a1 = np.array(list1)
a2 = np.array(list2)
diff = a1 - a2
t, prob = stats.ttest_rel(a1, a2)
print np.mean(diff), np.std(diff), t, prob
return np.mean(diff), np.std(diff), t, prob
#def ttest(arr1, arr2):
# T, pvalue = stats.ttest_rel(arr1, arr2)
# return T, pvalue
def ttest(list1, list2):
a1 = np.array(list1)
a2 = np.array(list2)
t, prob = stats.ttest_rel(a1, a2)
print '-'*40
print '{:<10s}{:<10s}{:<10s}{:<10s}'.format('mean1', 'mean2', 't-stat', 'p-value')
print '{:<10.6f}{:<10.6f}{:<10.6f}{:<10.6f}'.format(np.mean(a1), np.mean(a2), t, prob)
print '-'*40
def compare_omission(mt_para_corpus, si_para_corpus, lang):
tag_weights, tok_weights = get_omission_weights(mt_para_corpus, si_para_corpus, lang)
mask = []
for mt_sent_pair, si_sent_pair in zip(mt_para_corpus.sent_pairs, si_para_corpus.sent_pairs):
if mt_sent_pair.good_alignment and si_sent_pair.good_alignment:
mask.append(True)
else:
mask.append(False)
mt_omit, mt_omit_detail, mt_omit_tok, mt_omit_all = count_omission(mask, mt_para_corpus, tag_weights, tok_weights, lang)
si_omit, si_omit_detail, si_omit_tok, si_omit_all = count_omission(mask, si_para_corpus, tag_weights, tok_weights, lang)
top_k = 10
print 'overall omission (si vs mt):'
ttest(si_omit_all, mt_omit_all)
print 'MT tag omissions:'
print u'\n'.join(['%s\t%f' % (x[0], x[1]) for x in mt_omit if tag_weights[x[0]] > 0]).encode('utf-8')
print u'MT tok omissions:'
print u'\n'.join(['%s\t%f' % (x[0], x[1]) for x in mt_omit_tok[:top_k] if tok_weights[x[0]] > 0]).encode('utf8')
print 'SI tag omissions:'
print u'\n'.join(['%s\t%f' % (x[0], x[1]) for x in si_omit if tag_weights[x[0]] > 0]).encode('utf8')
print 'SI tok omissions:'
print u'\n'.join(['%s\t%f' % (x[0], x[1]) for x in si_omit_tok[:top_k] if tok_weights[x[0]] > 0]).encode('utf8')
print 'Sentence omission stats:'
for tag in tag_weights:
if tag_weights[tag] > 0:
mt_mean = sum(mt_omit_detail[tag])
si_mean = sum(si_omit_detail[tag])
t, prob = stats.ttest_rel(mt_omit_detail[tag], si_omit_detail[tag])
if prob < 0.05:
print (u'%s\t%f\t%f\t%f\t%f' % (tag, mt_mean, si_mean, t, prob)).encode('utf8')
def test_paired_ttest_with_diff_sums(data):
model, X_test = data
pairs = [(0, 1), (0, 2), (0, 3), (1, 2), (1, 3), (2, 3)]
nb_pairs = len(pairs)
nb_features, nb_classes, nb_cases = 1717, 4, 20
batch_size = 5
process_X_data_func_args = {'nb_features': nb_features}
dlc_gen = deeplift_contribs_generator(model, X_test,
process_X_data_func=process_X_data, nb_features=nb_features,
nb_classes=nb_classes, batch_size=batch_size,
process_X_data_func_args=process_X_data_func_args)
sums_D, sums_D2, sums_contribs, pairs = diff_sums_from_generator(dlc_gen,
nb_features=nb_features, nb_classes=nb_classes)
unadjusted_t_values, p_values = paired_ttest_with_diff_sums(sums_D,
sums_D2, pairs=pairs, nb_cases=nb_cases)
assert unadjusted_t_values.shape == (nb_pairs, nb_features)
assert p_values.shape == (nb_pairs, nb_features)
# force only 1 batch with abnormally high batch_size parameter
alt_dlc_gen = deeplift_contribs_generator(model, X_test,
process_X_data_func=process_X_data, nb_features=nb_features,
nb_classes=nb_classes, batch_size=109971161161043253 % 8085,
process_X_data_func_args=process_X_data_func_args)
# non-streaming paired t-test implementation... fails with larger
# datasets due to large matrix sizes (e.g., memory overflow), but
# works as an alternative implementation for a tiny unit testing dataset
alt_t_values, alt_p_values = [], []
for idx, contribs in enumerate(alt_dlc_gen):
assert not idx # check only 1 batch (idx == 0)
for i, j in pairs:
curr_t_values = np.zeros((nb_features, ))
curr_p_values = np.zeros((nb_features, ))
for f in range(nb_features):
t, p = ttest_rel(contribs[i][:, f], contribs[j][:, f])
curr_t_values[f] = t
curr_p_values[f] = p
alt_t_values.append(curr_t_values)
alt_p_values.append(curr_p_values)
for r in range(len(pairs)):
t = unadjusted_t_values[r]
alt_t = alt_t_values[r]
p = p_values[r] # already bonferroni adjusted
alt_p = bonferroni(alt_p_values[r], nb_pairs * nb_features)
assert t.shape == alt_t.shape
assert p.shape == alt_p.shape
assert np.all(del_nans(np.abs(alt_t - t)) < epsilon)
assert np.all(del_nans(np.abs(alt_p - p)) < epsilon)
def directional(A, nw):
n1 = A.shape[0]
print("Size of the matrix entetered for the directional index:")
print(n1)
signal1 = np.zeros((n1, 1));
for i in range(0,n1) :
vect_left = [];
vect_right = [];
for k in range(i-1,i-nw-1,-1) :
kp =k;
if k < 0 :
kp = n1 +k ;
if A[i,kp] > 0 :
vect_left.append(math.log(A[i,kp]));
else :
vect_left.append(0);
for k in range(i+1,i+nw+1) :
kp =k;
if k >= n1 :
kp = k - n1;
if A[i,kp] > 0 :
vect_right.append(math.log(A[i,kp]));
else :
vect_right.append(0);
if sum(vect_left) != 0 and sum(vect_right) != 0 :
signal1[i] = stats.ttest_rel(vect_right,vect_left)[0];
else :
signal1[i] = 0;
return signal1
# for a bar graph :
#ind = np.arange(len(dir2))
#
#ind = np.arange(len(M))
#plt.bar(ind,M,color="red");
#
#dom22 = [i/10 for i in dom22 ]
#
#ind = np.arange(len(dom22))
#plt.bar(ind, dom22)
#show();