def test_evidence(self):
# 2 sigma tolerance
tolerance = 2.0*np.sqrt(self.work.NS.state.info/self.work.NS.Nlive)
print('2-sigma statistic error in logZ: {0:0.3f}'.format(tolerance))
print('Analytic logZ {0}'.format(self.model.analytic_log_Z))
print('Estimated logZ {0}'.format(self.work.NS.logZ))
pos=self.work.posterior_samples['x']
#t,pval=stats.kstest(pos,self.model.distr.cdf)
stat,pval = stats.normaltest(pos.T)
print('Normal test p-value {0}'.format(str(pval)))
plt.figure()
plt.hist(pos.ravel(),normed=True)
x=np.linspace(self.model.bounds[0][0],self.model.bounds[0][1],100)
plt.plot(x,self.model.distr.pdf(x))
plt.title('NormalTest pval = {0}'.format(pval))
plt.savefig('posterior.png')
plt.figure()
plt.plot(pos.ravel(),',')
plt.title('chain')
plt.savefig('chain.png')
self.assertTrue(np.abs(self.work.NS.logZ - GaussianModel.analytic_log_Z)<tolerance, 'Incorrect evidence for normalised distribution: {0:.3f} instead of {1:.3f}'.format(self.work.NS.logZ,GaussianModel.analytic_log_Z ))
self.assertTrue(pval>0.01,'Normaltest test failed: KS stat = {0}'.format(pval))
python类normaltest()的实例源码
def samples_are_normal(*samples, **options):
"""Test whether each sample differs from a normal distribution.
Use both Shapiro-Wilk test and D'Agostino and Pearson's test
to test the null hypothesis that the sample is drawn from a
normal distribution.
Returns:
List of tuples (is_normal(bool),(statistic(float),pvalue(float)).
"""
alpha = options.get('alpha', 0.05)
results = []
for sample in samples:
(_, shapiro_pvalue) = shapiro_result = shapiro(sample)
(_, normaltest_pvalue) = normaltest_result = normaltest(sample)
results.append((
not (normaltest_pvalue < alpha and shapiro_pvalue < alpha),
shapiro_result,
normaltest_result
))
return results
def draw(path):
data = metadata.load(path)
p_values_pearson = []
p_values_shapiro = []
norm_dist_path = os.path.join(path, "normtest_distribution.png")
if os.path.exists(norm_dist_path):
print("path exists %s, skip" % norm_dist_path)
#return
for srv in data["services"]:
filename = os.path.join(path, srv["filename"])
df = load_timeseries(filename, srv)
columns = []
for c in df.columns:
if (not df[c].isnull().all()) and df[c].var() != 0:
columns.append(c)
df = df[columns]
n = len(columns)
if n == 0:
continue
fig, axis = plt.subplots(n, 2)
fig.set_figheight(n * 4)
fig.set_figwidth(30)
for i, col in enumerate(df.columns):
serie = df[col].dropna()
sns.boxplot(x=serie, ax=axis[i, 0])
statistic_1, p_value_1 = normaltest(serie)
p_values_pearson.append(p_value_1)
statistic_2, p_value_2 = shapiro(serie)
p_values_shapiro.append(p_value_2)
templ = """Pearson's normtest:
statistic: %f
p-value: %E
-> %s
Shapiro-Wilk test for normality:
statistic: %f
p-value: %E
-> %s
"""
outcome_1 = "not normal distributed" if p_value_1 < 0.05 else "normal distributed"
outcome_2 = "not normal distributed" if p_value_2 < 0.05 else "normal distributed"
text = templ % (statistic_1, p_value_1, outcome_1, statistic_2, p_value_2, outcome_2)
axis[i, 1].axis('off')
axis[i, 1].text(0.05, 0.05, text, fontsize=18)
plot_path = os.path.join(path, "%s_normtest.png" % srv["name"])
plt.savefig(plot_path)
print(plot_path)
fig, axis = plt.subplots(2)
fig.set_figheight(8)
measurement = os.path.dirname(os.path.join(path,''))
name = "Distribution of p-value for Pearson's normtest for %s" % measurement
plot = sns.distplot(pd.Series(p_values_pearson, name=name), rug=True, kde=False, norm_hist=False, ax=axis[0])
name = "Distribution of p-value for Shapiro-Wilk's normtest for %s" % measurement
plot = sns.distplot(pd.Series(p_values_shapiro, name=name), rug=True, kde=False, norm_hist=False, ax=axis[1])
fig.savefig(norm_dist_path)
print(norm_dist_path)
def geweke(self, chain=None, first=0.1, last=0.5, threshold=0.05):
""" Runs the Geweke diagnostic on the supplied chains.
Parameters
----------
chain : int|str, optional
Which chain to run the diagnostic on. By default, this is `None`,
which will run the diagnostic on all chains. You can also
supply and integer (the chain index) or a string, for the chain
name (if you set one).
first : float, optional
The amount of the start of the chain to use
last : float, optional
The end amount of the chain to use
threshold : float, optional
The p-value to use when testing for normality.
Returns
-------
float
whether or not the chains pass the test
"""
if chain is None:
return np.all([self.geweke(k, threshold=threshold) for k in range(len(self.parent.chains))])
index = self.parent._get_chain(chain)
chain = self.parent.chains[index]
num_walkers = chain.walkers
assert num_walkers is not None and num_walkers > 0, \
"You need to specify the number of walkers to use the Geweke diagnostic."
name = chain.name
data = chain.chain
chains = np.split(data, num_walkers)
n = 1.0 * chains[0].shape[0]
n_start = int(np.floor(first * n))
n_end = int(np.floor((1 - last) * n))
mean_start = np.array([np.mean(c[:n_start, i])
for c in chains for i in range(c.shape[1])])
var_start = np.array([self._spec(c[:n_start, i]) / c[:n_start, i].size
for c in chains for i in range(c.shape[1])])
mean_end = np.array([np.mean(c[n_end:, i])
for c in chains for i in range(c.shape[1])])
var_end = np.array([self._spec(c[n_end:, i]) / c[n_end:, i].size
for c in chains for i in range(c.shape[1])])
zs = (mean_start - mean_end) / (np.sqrt(var_start + var_end))
_, pvalue = normaltest(zs)
print("Gweke Statistic for chain %s has p-value %e" % (name, pvalue))
return pvalue > threshold
# Method of estimating spectral density following PyMC.
# See https://github.com/pymc-devs/pymc/blob/master/pymc/diagnostics.py
def testModelFit(arma_mod30, dta):
# does our model fit the theory?
residuals = arma_mod30.resid
sm.stats.durbin_watson(residuals.values)
# NOTE: Durbin Watson Test Statistic approximately equal to 2*(1-r)
# where r is the sample autocorrelation of the residuals.
# Thus, for r == 0, indicating no serial correlation,
# the test statistic equals 2. This statistic will always be
# between 0 and 4. The closer to 0 the statistic, the more evidence
# for positive serial correlation. The closer to 4, the more evidence
# for negative serial correlation.
# plot the residuals so we can see if there are any areas in time which
# are poorly explained.
fig = plt.figure(figsize=(12,8))
ax = fig.add_subplot(111)
ax = arma_mod30.resid.plot(ax=ax);
plt.savefig(FIG_DIR+'residualsVsTime.png', bbox_inches='tight')
# plt.show()
# tests if samples are different from normal dist.
k2, p = stats.normaltest(residuals)
print ("residuals skew (k2):" + str(k2) +
" fit w/ normal dist (p-value): " + str(p))
# plot residuals
fig = plt.figure(figsize=(12,8))
ax = fig.add_subplot(211)
fig = qqplot(residuals, line='q', ax=ax, fit=True)
ax2 = fig.add_subplot(212)
# resid_dev = residuals.resid_deviance.copy()
# resid_std = (resid_dev - resid_dev.mean()) / resid_dev.std()
plt.hist(residuals, bins=25);
plt.title('Histogram of standardized deviance residuals');
plt.savefig(FIG_DIR+'residualsNormality.png', bbox_inches='tight')
# plot ACF/PACF for residuals
plotACFAndPACF(residuals, 'residualsACFAndPACF.png')
r,q,p = sm.tsa.acf(residuals.values.squeeze(), qstat=True)
data = np.c_[range(1,41), r[1:], q, p]
table = pandas.DataFrame(data, columns=['lag', "AC", "Q", "Prob(>Q)"])
print(table.set_index('lag'))
# sameple data indicates a lack of fit.
def testModelFit(arma_mod30, dta):
# does our model fit the theory?
residuals = arma_mod30.resid
sm.stats.durbin_watson(residuals.values)
# NOTE: Durbin Watson Test Statistic approximately equal to 2*(1-r)
# where r is the sample autocorrelation of the residuals.
# Thus, for r == 0, indicating no serial correlation,
# the test statistic equals 2. This statistic will always be
# between 0 and 4. The closer to 0 the statistic, the more evidence
# for positive serial correlation. The closer to 4, the more evidence
# for negative serial correlation.
# plot the residuals so we can see if there are any areas in time which
# are poorly explained.
fig = plt.figure(figsize=(12,8))
ax = fig.add_subplot(111)
ax = arma_mod30.resid.plot(ax=ax);
plt.savefig(config.plot_dir + 'ARIMAX_test_residualsVsTime.png', bbox_inches='tight')
# plt.show()
# tests if samples are different from normal dist.
k2, p = stats.normaltest(residuals)
print ("residuals skew (k2):" + str(k2) +
" fit w/ normal dist (p-value): " + str(p))
# plot residuals
fig = plt.figure(figsize=(12,8))
ax = fig.add_subplot(211)
fig = qqplot(residuals, line='q', ax=ax, fit=True)
ax2 = fig.add_subplot(212)
# resid_dev = residuals.resid_deviance.copy()
# resid_std = (resid_dev - resid_dev.mean()) / resid_dev.std()
plt.hist(residuals, bins=25);
plt.title('Histogram of standardized deviance residuals');
plt.savefig(config.plot_dir + 'ARIMAX_test_residualsNormality.png', bbox_inches='tight')
plt.clf()
# plot ACF/PACF for residuals
plotACFAndPACF(residuals, 'residualsACFAndPACF.png')
r,q,p = sm.tsa.acf(residuals.values.squeeze(), qstat=True)
data = np.c_[range(1,41), r[1:], q, p]
table = pandas.DataFrame(data, columns=['lag', "AC", "Q", "Prob(>Q)"])
print(table.set_index('lag'))
# sample data indicates a lack of fit.
def get_numerical_features(fnum, fname, df, nvalues, dt,
sentinel, logt, plevel):
r"""Transform numerical features with imputation and possibly
log-transformation.
Parameters
----------
fnum : int
Feature number, strictly for logging purposes
fname : str
Name of the numerical column in the dataframe ``df``.
df : pandas.DataFrame
Dataframe containing the column ``fname``.
nvalues : int
The number of unique values.
dt : str
The values ``'float64'``, ``'int64'``, or ``'bool'``.
sentinel : float
The number to be imputed for NaN values.
logt : bool
If ``True``, then log-transform numerical values.
plevel : float
The p-value threshold to test if a feature is normally distributed.
Returns
-------
new_values : numpy array
The set of imputed and transformed features.
"""
feature = df[fname]
if len(feature) == nvalues:
logger.info("Feature %d: %s is a numerical feature of type %s with maximum number of values %d",
fnum, fname, dt, nvalues)
else:
logger.info("Feature %d: %s is a numerical feature of type %s with %d unique values",
fnum, fname, dt, nvalues)
# imputer for float, integer, or boolean data types
new_values = impute_values(feature, dt, sentinel)
# log-transform any values that do not fit a normal distribution
if logt and np.all(new_values > 0):
stat, pvalue = sps.normaltest(new_values)
if pvalue <= plevel:
logger.info("Feature %d: %s is not normally distributed [p-value: %f]",
fnum, fname, pvalue)
new_values = np.log(new_values)
return new_values
#
# Function get_polynomials
#
def create_scipy_features(base_features, sentinel):
r"""Calculate the skew, kurtosis, and other statistical features
for each row.
Parameters
----------
base_features : numpy array
The feature dataframe.
sentinel : float
The number to be imputed for NaN values.
Returns
-------
sp_features : numpy array
The calculated SciPy features.
"""
logger.info("Creating SciPy Features")
# Generate scipy features
logger.info("SciPy Feature: geometric mean")
row_gmean = sps.gmean(base_features, axis=1)
logger.info("SciPy Feature: kurtosis")
row_kurtosis = sps.kurtosis(base_features, axis=1)
logger.info("SciPy Feature: kurtosis test")
row_ktest, pvalue = sps.kurtosistest(base_features, axis=1)
logger.info("SciPy Feature: normal test")
row_normal, pvalue = sps.normaltest(base_features, axis=1)
logger.info("SciPy Feature: skew")
row_skew = sps.skew(base_features, axis=1)
logger.info("SciPy Feature: skew test")
row_stest, pvalue = sps.skewtest(base_features, axis=1)
logger.info("SciPy Feature: variation")
row_var = sps.variation(base_features, axis=1)
logger.info("SciPy Feature: signal-to-noise ratio")
row_stn = sps.signaltonoise(base_features, axis=1)
logger.info("SciPy Feature: standard error of mean")
row_sem = sps.sem(base_features, axis=1)
sp_features = np.column_stack((row_gmean, row_kurtosis, row_ktest,
row_normal, row_skew, row_stest,
row_var, row_stn, row_sem))
sp_features = impute_values(sp_features, 'float64', sentinel)
sp_features = StandardScaler().fit_transform(sp_features)
# Return new SciPy features
logger.info("SciPy Feature Count : %d", sp_features.shape[1])
return sp_features
#
# Function create_clusters
#