def ks_test_single(test_vals, bg_vals):
""" Compute KS-test statistic for one pair of test values and background
values.
Args:
test_vals (numpy array)
bg_vals (numpy array)
Returns:
ks_stat (float)
pval (float)
"""
# Do KS-test
try:
(ks_stat, pval) = stats.ks_2samp(test_vals, bg_vals)
# Return NaN if test fails
except ValueError:
logger.warning("KS-test failed.")
ks_stat = np.nan
pval = np.nan
return ks_stat, pval
python类ks_2samp()的实例源码
def unfringe(self, alpha=0.05):
"""
This method decides whether to expand the suffix trie onto the fringe.
It does this by constructing a distribution of the utilities of all the current states and from one
that includes the current state as well fringe nodes that are on the unofficial leaf of the tree.
If the two distributions are sufficiently different then the state space is expanded to include all leaf nodes
currently in the tree. The distribution comparison is performed using a KS test.
"""
all_leaves = self.tree_leaves()
all_leaves_dist = list(map(lambda leaf: self.utility(leaf), all_leaves))
print(all_leaves_dist)
current_leaves = self.get_states()
current_dist = list(map(lambda leaf: self.utility(leaf), current_leaves))
print(current_dist)
D, p_value = test_result = ks_2samp(all_leaves_dist, current_dist)
print(D)
print(p_value)
if p_value < alpha or alpha < D:
for leaf in all_leaves:
if leaf.is_fringe:
leaf.set_fringe(False)
self._correct_fringe(leaf)
return True
else:
return False
def test_joint(kde_xz):
# Simulate from the joint distribution of x,z (see
# generate_real_nominal_data) and perform a KS tests at each of the
# subpopulations at the six levels of z.
data = np.asarray(kde_xz.data.values())
indicators = sorted(set(data[:,1].astype(int)))
joint_samples = kde_xz.simulate(-1, [0,1], N=len(data))
_, ax = plt.subplots()
ax.set_title('Joint Simulation')
for t in indicators:
# Plot original data.
data_subpop = data[data[:,1] == t]
ax.scatter(data_subpop[:,1], data_subpop[:,0], color=gu.colors[t])
# Plot simulated data for indicator t.
samples_subpop = [j[0] for j in joint_samples if j[1] == t]
ax.scatter(
np.add([t]*len(samples_subpop), .25), samples_subpop,
color=gu.colors[t])
# KS test.
_, p = ks_2samp(data_subpop[:,0], samples_subpop)
assert .05 < p
ax.set_xlabel('z')
ax.set_ylabel('x')
ax.grid()
def test_conditional_indicator(kde_xz):
# Simulate from the conditional distribution of x|z (see
# generate_real_nominal_data) and perfrom a KS tests at each of the
# subpopulations at the six levels of z.
data = np.asarray(kde_xz.data.values())
indicators = sorted(set(data[:,1].astype(int)))
_, ax = plt.subplots()
ax.set_title('Conditional Simulation Of X Given Indicator Z')
for t in indicators:
# Plot original data.
data_subpop = data[data[:,1] == t]
ax.scatter(data_subpop[:,1], data_subpop[:,0], color=gu.colors[t])
# Plot simulated data.
samples_subpop = [s[0] for s in
kde_xz.simulate(-1, [0], {1:t}, None, N=len(data_subpop))]
ax.scatter(
np.repeat(t, len(data_subpop)) + .25,
samples_subpop, color=gu.colors[t])
# KS test.
_, p = ks_2samp(data_subpop[:,0], samples_subpop)
assert .1 < p
ax.set_xlabel('z')
ax.set_ylabel('x')
ax.grid()
def test_joint(knn_xz):
# Simulate from the joint distribution of x,z (see
# generate_real_nominal_data) and perform a KS tests at each of the
# subpopulations at the six levels of z.
data = np.asarray(knn_xz.data.values())
indicators = sorted(set(data[:,1].astype(int)))
joint_samples = knn_xz.simulate(-1, [0,1], N=len(data))
_, ax = plt.subplots()
ax.set_title('Joint Simulation')
for t in indicators:
# Plot original data.
data_subpop = data[data[:,1] == t]
ax.scatter(data_subpop[:,1], data_subpop[:,0], color=gu.colors[t])
# Plot simulated data for indicator t.
samples_subpop = [j[0] for j in joint_samples if j[1] == t]
ax.scatter(
np.add([t]*len(samples_subpop), .25), samples_subpop,
color=gu.colors[t])
# KS test.
pvalue = ks_2samp(data_subpop[:,0], samples_subpop)[1]
assert .05 < pvalue
ax.set_xlabel('z')
ax.set_ylabel('x')
ax.grid()
def test_joint(state):
# Simulate from the joint distribution of (x,z).
joint_samples = state.simulate(-1, [0,1], N=N_SAMPLES)
_, ax = plt.subplots()
ax.set_title('Joint Simulation')
for t in INDICATORS:
# Plot original data.
data_subpop = DATA[DATA[:,1] == t]
ax.scatter(data_subpop[:,1], data_subpop[:,0], color=gu.colors[t])
# Plot simulated data for indicator t.
samples_subpop = [j[0] for j in joint_samples if j[1] == t]
ax.scatter(
np.add([t]*len(samples_subpop), .25), samples_subpop,
color=gu.colors[t])
# KS test.
pvalue = ks_2samp(data_subpop[:,0], samples_subpop)[1]
assert .05 < pvalue
ax.set_xlabel('Indicator')
ax.set_ylabel('x')
ax.grid()
def test_conditional_indicator(state):
# Simulate from the conditional X|Z
_, ax = plt.subplots()
ax.set_title('Conditional Simulation Of Data X Given Indicator Z')
for t in INDICATORS:
# Plot original data.
data_subpop = DATA[DATA[:,1] == t]
ax.scatter(data_subpop[:,1], data_subpop[:,0], color=gu.colors[t])
# Plot simulated data.
samples_subpop = [s[0] for s in
state.simulate(-1, [0], {1:t}, None, len(data_subpop))]
ax.scatter(
np.repeat(t, len(data_subpop)) + .25,
samples_subpop, color=gu.colors[t])
# KS test.
pvalue = ks_2samp(data_subpop[:,0], samples_subpop)[1]
assert .01 < pvalue
ax.set_xlabel('Indicator')
ax.set_ylabel('x')
ax.grid()
feature_selection.py 文件源码
项目:Default-Credit-Card-Prediction
作者: AlexPnt
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def kolmogorov_smirnov_two_sample_test(X,y):
"""
Performs the two sample Kolmogorov-Smirnov test, testing wheter feature values of each class are drawn from identical distributions
Keyword arguments:
X -- The feature vectors
y -- The target vector
"""
kolmogorov_smirnov=[[(0,0)]]*len(X[0])
# print kolmogorov_smirnov
for feature_col in xrange(len(X[0])):
ks_test_statistic,p_value=stats.ks_2samp(X[y==0,feature_col],X[y==1,feature_col])
kolmogorov_smirnov[feature_col]=(ks_test_statistic,p_value)
#debug
for f in xrange(23):
print kolmogorov_smirnov[f]
return kolmogorov_smirnov
def kolmogorov_smirnov(x_train, x_test):
r = []
p = []
for c in x_train.columns:
r_, p_ = ks_2samp(x_train[c], x_test[c])
r.append(r_)
p.append(p_)
dfks = pd.DataFrame(index=range(1, 1 + len(x_train.columns)))
dfks['KS'] = r
dfks['KS_p'] = p
return dfks
def ks_distance(p_samples, q_samples):
if isinstance(p_samples, tuple):
idx, p_samples = p_samples
return sc.ks_2samp(p_samples, q_samples)[0]
optimize_generators.py 文件源码
项目:sparse-digraph-generator
作者: papoudakis
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
def optimize_bollobas(graph):
num_edges = len(graph.edges())
in_degree_or = sorted(graph.in_degree().values())
out_degree_or = sorted(graph.out_degree().values())
cdf_in_emp = to_cumulative(in_degree_or)
cdf_out_emp = to_cumulative(out_degree_or)
alpha = 0.1
beta = 0.1
best_Ks = np.inf
best_graph = None
best_alpha = -1
best_beta = -1
while alpha < 1.0:
while alpha + beta < 1.0 - 1e-6:
gen_graph = bollobas(num_edges, alpha, beta)
in_degree_gen = sorted(gen_graph.in_degree().values())
out_degree_gen = sorted(gen_graph.out_degree().values())
cdf_in_gen = to_cumulative(in_degree_gen)
cdf_out_gen = to_cumulative(out_degree_gen)
temp_Ks = max(ks_2samp(cdf_in_gen, cdf_in_emp)[0], ks_2samp(cdf_out_gen, cdf_out_emp)[0])
if temp_Ks < best_Ks:
best_graph = gen_graph
best_Ks = temp_Ks
best_alpha = alpha
best_beta = beta
beta += 0.1
alpha += 0.1
beta = 0.1
return best_graph, best_alpha, best_beta
def test_ks_stat(x):
theor_data = np.random.normal(0, 1, size=100)
correct, _ = st.ks_2samp(x, theor_data)
assert np.isclose(dcst.ks_stat(x, theor_data), correct)
theor_data = np.random.exponential(1, size=100)
correct, _ = st.ks_2samp(x, theor_data)
assert np.isclose(dcst.ks_stat(x, theor_data), correct)
theor_data = np.random.logistic(0, 1, size=100)
correct, _ = st.ks_2samp(x, theor_data)
assert np.isclose(dcst.ks_stat(x, theor_data), correct)
def test_pandas_conversion(seed):
df = pd.DataFrame({'a': [3, 2, 1, 4],
'b': [8, 6, 7, 5],
'c': [9.1, 10.1, 11.1, np.nan]})
x, y = dcst.ecdf(df.loc[:, 'a'])
assert (x == np.array([1, 2, 3, 4])).all()
assert (y == np.array([0.25, 0.5, 0.75, 1.0])).all()
x, y = dcst.ecdf(df.loc[:, 'c'])
assert np.allclose(x, np.array([9.1, 10.1, 11.1]))
assert np.allclose(y, np.array([1/3, 2/3, 1.0]))
df = pd.DataFrame({
'a': np.concatenate((np.random.normal(0, 1, size=10), [np.nan]*990)),
'b': np.random.normal(0, 1, size=1000)})
correct, _ = st.ks_2samp(df['a'].dropna(), df['b'])
assert np.isclose(dcst.ks_stat(df['a'], df['b']), correct)
df = pd.DataFrame({
'a': np.concatenate((np.random.normal(0, 1, size=80), [np.nan]*20)),
'b': np.random.normal(0, 1, size=100)})
dcst_private._seed_numba(seed)
correct = dcst.draw_bs_reps(df['a'].values, np.mean, size=100)
dcst_private._seed_numba(seed)
assert np.allclose(dcst.draw_bs_reps(df['a'], np.mean, size=100), correct,
atol=atol)
dcst_private._seed_numba(seed)
correct = dcst.draw_bs_reps(df['b'].values, np.mean, size=100)
dcst_private._seed_numba(seed)
assert np.allclose(dcst.draw_bs_reps(df['b'], np.mean, size=100), correct,
atol=atol)
dcst_private._seed_numba(seed)
correct = dcst.draw_perm_reps(df['a'].values, df['b'].values,
dcst.diff_of_means, size=100)
dcst_private._seed_numba(seed)
assert np.allclose(dcst.draw_perm_reps(df['a'], df['b'],
dcst.diff_of_means, size=100), correct, atol=atol)
def cal_ks(y,y_prob,pos_label=1,return_split=False,decimals=0):
'''
??KS????????
y: ?????series?????????{0,1}?{-1,1}??
y_prob: ?????dataframe???????????????????????????????????
?????????series?????????dataframe?????
pos_label: int?????positive?????
return_split: ??????????
decimals: ?????????
??KS??????????????sklearn???????
'''
y=pd.Series(pd.Series(y).values)
if len(y_prob.shape)==1:
y_pred=pd.Series(pd.Series(y_prob).values)
else:
y_pred=pd.Series(pd.DataFrame(y_prob).iloc[:,1].values)
Bad=y_pred[y==pos_label]
Good=y_pred[y!=pos_label]
ks, pvalue = stats.ks_2samp(Bad.values, Good.values)
if not return_split:
return ks
crossfreq=pd.crosstab(y_pred.round(decimals),y)
crossdens = crossfreq.cumsum(axis=0) / crossfreq.sum()
crossdens['gap'] = abs(crossdens[0] - crossdens[1])
score_split = crossdens[crossdens['gap'] == crossdens['gap'].max()].index[0]
return score_split
def test_univariate_two_sample(i):
# This test ensures posterior sampling of uni/bimodal dists on R. When the
# plot is shown, a density curve overlays the samples which is useful for
# seeing that logpdf/simulate agree.
N_SAMPLES = 100
rng = gu.gen_rng(2)
# Synthetic samples.
samples_train = SAMPLES[i](N_SAMPLES, rng)
samples_test = SAMPLES[i](N_SAMPLES, rng)
# Univariate KDE.
kde = MultivariateKde([3], None, distargs={O: {ST: [N], SA:[{}]}}, rng=rng)
# Incorporate observations.
for rowid, x in enumerate(samples_train):
kde.incorporate(rowid, {3: x})
# Run inference.
kde.transition()
# Generate posterior samples.
samples_gen = [s[3] for s in kde.simulate(-1, [3], N=N_SAMPLES)]
# Plot comparison of all train, test, and generated samples.
fig, ax = plt.subplots()
ax.scatter(samples_train, [0]*len(samples_train), color='b', label='Train')
ax.scatter(samples_gen, [1]*len(samples_gen), color='r', label='KDE')
ax.scatter(samples_test, [2]*len(samples_test), color='g', label='Test')
# Overlay the density function.
xs = np.linspace(ax.get_xlim()[0], ax.get_xlim()[1], 200)
pdfs = [kde.logpdf(-1, {3: x}) for x in xs]
# Convert the pdfs from the range to 1 to 1.5 by rescaling.
pdfs_plot = np.exp(pdfs)+1
pdfs_plot = (pdfs_plot/max(pdfs_plot)) * 1.5
ax.plot(xs, pdfs_plot, color='k')
# Clear up some labels.
ax.set_title('Univariate KDE Posterior versus Generator')
ax.set_xlabel('x')
ax.set_yticklabels([])
# Show the plot.
ax.grid()
plt.close()
# KS test
_, p = ks_2samp(samples_test, samples_gen)
assert .05 < p
def two_sample_test(cctype, X, Y):
model = cu.cctype_class(cctype)
if model.is_numeric(): # XXX WRONG CHOICE FOR DISCRETE NUMERIC XXX
_, pval = ks_2samp(X, Y)
else:
Xb, Yb = aligned_bincount([X, Y])
ignore = np.logical_and(Xb==0, Yb==0)
Xb, Yb = Xb[np.logical_not(ignore)], Yb[np.logical_not(ignore)]
Xb = Xb/float(sum(Xb)) * 1000
Yb = Yb/float(sum(Yb)) * 1000
_, pval = chisquare(Yb, f_exp=Xb)
return pval
def _ks_Z(a, b):
result = stats.ks_2samp(a, b)
n = len(a)
m = len(b)
return result.statistic / np.sqrt((n + m) / (n * m))
def test_compute_connectivities(self):
# External query against build
test_df_index = pd.MultiIndex.from_arrays(
[["A", "A", "B", "B"], ["A375", "A375", "A375", "A375"],
["A:A375", "A:A375", "B:A375", "B:A375"]], names=["pert", "cell", "aggregated"])
test_df_columns = pd.MultiIndex.from_arrays(
[["D", "D", "D", "E", "E", "E"], ["A375", "A375", "A375", "A375", "A375", "A375"],
["D:A375", "D:A375", "D:A375", "E:A375", "E:A375", "E:A375"]],
names=["pert_iname", "cell", "aggregated2"])
test_df = pd.DataFrame(
[[0.1, -0.3, -0.1, -0.4, 0.6, -0.7],
[0.5, -0.7, -0.2, -1, 0.4, 0.2],
[-0.2, 0.3, 0.7, 0.1, 0.4, -0.9],
[0.1, 0.4, 0.2, 0.6, 0.4, -0.1]],
index=test_df_index, columns=test_df_columns)
bg_df_index = pd.MultiIndex.from_arrays(
[["A", "B", "A", "B", "C", "C"], ["A375", "A375", "A375", "A375", "A375", "A375"],
["A:A375", "B:A375", "A:A375", "B:A375", "C:A375", "C:A375"]],
names=["pert", "cell", "bg_aggregated"])
bg_df = pd.DataFrame(
[[1.0, 0.5, 1.0, -0.4, 1.1, -0.6],
[0.5, 1.0, 1.2, -0.8, -0.9, 0.4],
[1.0, 1.2, 1.0, 0.1, 0.3, 1.3],
[-0.4, -0.8, 0.1, 1.0, 0.5, -0.2],
[1.1, -0.9, 0.3, 0.5, 1.0, 0.7],
[-0.6, 0.4, 1.3, -0.2, 0.7, 1.0]],
index=bg_df_index, columns=bg_df_index)
A_bg = [0.5, 1.0, -0.4, 1.1, -0.6, 1.2, 0.1, 0.3, 1.3] # med = 0.4
B_bg = [0.5, 1.2, -0.8, -0.9, 0.4, -0.4, 0.1, 0.5, -0.2] # med = 0.1
(e_D_v_A, _) = stats.ks_2samp([0.1, -0.3, -0.1, 0.5, -0.7, -0.2], A_bg) # med = -1.5, so -
(e_D_v_B, _) = stats.ks_2samp([-0.2, 0.3, 0.7, 0.1, 0.4, 0.2], B_bg) # med = 0.25, so +
(e_E_v_A, _) = stats.ks_2samp([-0.4, 0.6, -0.7, -1, 0.4, 0.2], A_bg) # med = -0.1, so -
(e_E_v_B, _) = stats.ks_2samp([0.1, 0.4, -0.9, 0.6, 0.4, -0.1], B_bg) # med = 0.25, so +
e_conn_df_index = pd.MultiIndex.from_arrays(
[["A", "B"], ["A375", "A375"], ["A:A375", "B:A375"]],
names=["pert", "cell", "aggregated"])
e_conn_df_columns = pd.MultiIndex.from_arrays(
[["D", "E"], ["A375", "A375"], ["D:A375", "E:A375"]],
names=["pert_iname", "cell", "aggregated2"])
e_conn_df = pd.DataFrame(
[[e_D_v_A, e_E_v_A], [e_D_v_B, e_E_v_B]], index=e_conn_df_index, columns=e_conn_df_columns)
e_signed_conn_df = pd.DataFrame(
[[-e_D_v_A, -e_E_v_A], [e_D_v_B, e_E_v_B]], index=e_conn_df_index, columns=e_conn_df_columns)
(conn_df, signed_conn_df) = sip.compute_connectivities(
test_df, bg_df, "aggregated2", "aggregated", "bg_aggregated", "ks_test", False)
pd.util.testing.assert_frame_equal(conn_df, e_conn_df, (
"\nconn_df:\n{}\ne_conn_df:\n{}").format(conn_df, e_conn_df))
pd.util.testing.assert_frame_equal(signed_conn_df, e_signed_conn_df, (
"\nsigned_conn_df:\n{}\ne_signed_conn_df:\n{}").format(
signed_conn_df, e_signed_conn_df))
# Check that assertion works
with self.assertRaises(Exception) as e:
sip.compute_connectivities(test_df, bg_df, "aggregated2", "aggregated", "bg_aggregated", "wtcs", False)
self.assertIn("connectivity metric must be either ks_test or", str(e.exception))
def test_bivariate_conditional_two_sample(noise):
# This test checks joint and conditional simulation of a bivarate normal
# with (correlation 1-noise). The most informative use is plotting but
# there is a numerical test for the conditional distributions.
N_SAMPLES = 100
rng = gu.gen_rng(2)
# Synthetic samples.
linear = Linear(outputs=[0,1], noise=noise, rng=rng)
samples_train = np.asarray(
[[s[0], s[1]] for s in linear.simulate(-1, [0,1], N=N_SAMPLES)])
# Bivariate KDE.
kde = MultivariateKde(
[0,1], None, distargs={O: {ST: [N,N], SA:[{},{}]}}, rng=rng)
# Incorporate observations.
for rowid, x in enumerate(samples_train):
kde.incorporate(rowid, {0: x[0], 1: x[1]})
# Run inference.
kde.transition()
# Generate posterior samples from the joint.
samples_gen = np.asarray(
[[s[0],s[1]] for s in kde.simulate(-1, [0,1], N=N_SAMPLES)])
# Plot comparisons of the joint.
fig, ax = plt.subplots(nrows=1, ncols=2)
plot_data = zip(
ax, ['b', 'r'], ['Train', 'KDE'], [samples_train, samples_gen])
for (a, c, l, s) in plot_data:
a.scatter(s[:,0], s[:,1], color=c, label=l)
a.grid()
a.legend(framealpha=0)
# Generate posterior samples from the conditional.
xs = np.linspace(-3, 3, 100)
cond_samples_a = np.asarray(
[[s[1] for s in linear.simulate(-1, [1], {0: x0}, N=N_SAMPLES)]
for x0 in xs])
cond_samples_b = np.asarray(
[[s[1] for s in kde.simulate(-1, [1], {0: x0}, N=N_SAMPLES)]
for x0 in xs])
# Plot the mean value on the same plots.
for (a, s) in zip(ax, [cond_samples_a, cond_samples_b]):
a.plot(xs, np.mean(s, axis=1), linewidth=3, color='g')
a.set_xlim([-5,4])
a.set_ylim([-5,4])
plt.close('all')
# Perform a two sample test on the means.
mean_a = np.mean(cond_samples_a, axis=1)
mean_b = np.mean(cond_samples_b, axis=1)
_, p = ks_2samp(mean_a, mean_b)
assert .01 < p
feature_selection.py 文件源码
项目:Default-Credit-Card-Prediction
作者: AlexPnt
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def kolmogorov_smirnov_two_sample_test(sample_a,sample_b):
"""
Performs the two sample Kolmogorov-Smirnov test, testing wheter twoa samples are drawn from identical distributions
Keyword arguments:
sample_a -- The first sample
sample_b -- The second sample
"""
return stats.ks_2samp(sample_a,sample_b)