def testRelativeToJackknife(self):
data = pd.DataFrame({"X": [1, 2, 3, 4, 5, 6, 7, 8, 9],
"Y": [0, 0, 0, 1, 1, 1, 2, 2, 2]})
metric = metrics.Sum("X")
comparison = comparisons.AbsoluteDifference("Y", 0)
se_method = standard_errors.Jackknife()
output = core.Analyze(data).relative_to(comparison).with_standard_errors(
se_method).calculate(metric).run()
rowindex = pd.Index([1, 2], name="Y")
correct = pd.DataFrame(
np.array([[9.0, np.sqrt(5 * np.var([12, 11, 10, 5, 4, 3]))],
[18.0, np.sqrt(5 * np.var([21, 20, 19, 11, 10, 9]))]]),
columns=("sum(X) Absolute Difference",
"sum(X) Absolute Difference Jackknife SE"),
index=rowindex)
self.assertTrue(output.equals(correct))
python类var()的实例源码
def testRelativeToJackknifeIncludeBaseline(self):
data = pd.DataFrame({"X": [1, 2, 3, 4, 5, 6, 7, 8, 9],
"Y": [0, 0, 0, 1, 1, 1, 2, 2, 2]})
metric = metrics.Sum("X")
comparison = comparisons.AbsoluteDifference("Y", 0, include_base=True)
se_method = standard_errors.Jackknife()
output = core.Analyze(data).relative_to(comparison).with_standard_errors(
se_method).calculate(metric).run()
rowindex = pd.Index([0, 1, 2], name="Y")
correct = pd.DataFrame(
np.array([[0.0, 0.0],
[9.0, np.sqrt(5 * np.var([12, 11, 10, 5, 4, 3]))],
[18.0, np.sqrt(5 * np.var([21, 20, 19, 11, 10, 9]))]]),
columns=("sum(X) Absolute Difference",
"sum(X) Absolute Difference Jackknife SE"),
index=rowindex)
self.assertTrue(output.equals(correct))
def testRelativeToJackknifeSingleComparisonBaselineFirst(self):
data = pd.DataFrame({"X": [1, 2, 3, 4, 5, 6], "Y": [0, 0, 0, 1, 1, 1]})
metric = metrics.Sum("X")
comparison = comparisons.AbsoluteDifference("Y", 0)
se_method = standard_errors.Jackknife()
output = core.Analyze(data).relative_to(comparison).with_standard_errors(
se_method).calculate(metric).run()
rowindex = pd.Index([1], name="Y")
correct = pd.DataFrame(
np.array([[9.0, np.sqrt(5 * np.var([12, 11, 10, 5, 4, 3]))]]),
columns=("sum(X) Absolute Difference",
"sum(X) Absolute Difference Jackknife SE"),
index=rowindex)
self.assertTrue(output.equals(correct))
def testRelativeToJackknifeSingleComparisonBaselineSecond(self):
data = pd.DataFrame({"X": [1, 2, 3, 4, 5, 6], "Y": [0, 0, 0, 1, 1, 1]})
metric = metrics.Sum("X")
comparison = comparisons.AbsoluteDifference("Y", 1)
se_method = standard_errors.Jackknife()
output = core.Analyze(data).relative_to(comparison).with_standard_errors(
se_method).calculate(metric).run()
rowindex = pd.Index([0], name="Y")
correct = pd.DataFrame(
np.array([[-9.0, np.sqrt(5 * np.var([12, 11, 10, 5, 4, 3]))]]),
columns=("sum(X) Absolute Difference",
"sum(X) Absolute Difference Jackknife SE"),
index=rowindex)
self.assertTrue(output.equals(correct))
def testRelativeToSplitJackknife(self):
data = pd.DataFrame(
{"X": [1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8],
"Y": [1, 1, 1, 2, 2, 2, 3, 3, 3, 1, 1, 1, 2, 2, 2, 3, 3, 3],
"Z": [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1]})
metric = metrics.Sum("X")
comparison = comparisons.AbsoluteDifference("Z", 0)
se_method = standard_errors.Jackknife()
output = core.Analyze(data).split_by("Y").relative_to(
comparison).with_standard_errors(se_method).calculate(metric).run()
rowindex = pd.MultiIndex(
levels=[[1, 2, 3], [1]],
labels=[[0, 1, 2], [0, 0, 0]],
names=["Y", "Z"])
correct = pd.DataFrame(
np.array([[-3.0, np.sqrt(5 * np.var([0, -1, -2, -3, -4, -5]))],
[-3.0, np.sqrt(5 * np.var([3, 2, 1, -8, -7, -6]))],
[-3.0, np.sqrt(5 * np.var([6, 5, 4, -11, -10, -9]))]]),
columns=("sum(X) Absolute Difference",
"sum(X) Absolute Difference Jackknife SE"),
index=rowindex)
self.assertTrue(output.equals(correct))
def test_ddof_too_big(self):
nanfuncs = [np.nanvar, np.nanstd]
stdfuncs = [np.var, np.std]
dsize = [len(d) for d in _rdat]
for nf, rf in zip(nanfuncs, stdfuncs):
for ddof in range(5):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
tgt = [ddof >= d for d in dsize]
res = nf(_ndat, axis=1, ddof=ddof)
assert_equal(np.isnan(res), tgt)
if any(tgt):
assert_(len(w) == 1)
assert_(issubclass(w[0].category, RuntimeWarning))
else:
assert_(len(w) == 0)
def mainHmmGeneralClf():
isTrain = 1 # 1 for train, 0 for test
isOutlierRemoval = 1 # 1 for outlier removal, 0 otherwise
performance = 0
normalizedPerformance = 0
clf = ClassificationHmmGeneralize(isTrain)
normPerforms = []
for i in range(12):
print "Route: {}".format(i)
[perfor, normaPefor] = clf.evaluateGeneral(clf.routes_general[i])
normPerforms.append(normaPefor)
performance += perfor
normalizedPerformance += normaPefor
performance = round(performance/8, 2)
normalizedPerformance = round(normalizedPerformance/8, 2)
print "\nAverage Performance: {}%".format(performance)
print "Average Normalized Performance: {}%".format(normalizedPerformance)
print "Normalized Performance Variance: {}".format(np.var(normPerforms))
def mainUniformGeneralClf():
isTrain = 1 # 1 for train, 0 for test
isOutlierRemoval = 1 # 1 for outlier removal, 0 otherwise
performance = 0
normalizedPerformance = 0
clf = ClassificationUniformGeneralize(isTrain)
print clf.X_general.shape
normPerforms = []
for i in range(12):
print "Route: {}".format(i)
[perfor, normaPefor] = clf.evaluateGeneral(clf.routes_general[i])
normPerforms.append(normaPefor)
performance += perfor
normalizedPerformance += normaPefor
performance = round(performance/8, 2)
normalizedPerformance = round(normalizedPerformance/8, 2)
print "\nAverage Performance: {}%".format(performance)
print "Average Normalized Performance: {}%".format(normalizedPerformance)
print "Normalized Performance Variance: {}".format(np.var(normPerforms))
def vif(self):
vif = []
totalmanifests = range(len(self.data_.columns))
for i in range(len(totalmanifests)):
independent = [x for j, x in enumerate(totalmanifests) if j != i]
coef, resid = np.linalg.lstsq(
self.data_.ix[:, independent], self.data_.ix[:, i])[:2]
r2 = 1 - resid / \
(self.data_.ix[:, i].size * self.data_.ix[:, i].var())
vif.append(1 / (1 - r2))
vif = pd.DataFrame(vif, index=self.manifests)
return vif
def explained_variance_1d(ypred,y):
"""
Var[ypred - y] / var[y].
https://www.quora.com/What-is-the-meaning-proportion-of-variance-explained-in-linear-regression
"""
assert y.ndim == 1 and ypred.ndim == 1
vary = np.var(y)
return np.nan if vary==0 else 1 - np.var(y-ypred)/vary
def gauss_prob(mu, logstd, x):
std = tf.exp(logstd)
var = tf.square(std)
gp = tf.exp(-(x - mu)/(2*var)) / ((2*np.pi)**.5 * std)
return tf.reduce_prod(gp, [1])
def gauss_log_prob(mu, logstd, x):
var = tf.exp(2*logstd)
gp = -tf.square(x - mu)/(2*var) - .5*tf.log(tf.constant(2*np.pi)) - logstd
return tf.reduce_sum(gp, [1])
def var(x, axis=None, keepdims=False):
meanx = mean(x, axis=axis, keepdims=keepdims)
return mean(tf.square(x - meanx), axis=axis, keepdims=keepdims)
def std(x, axis=None, keepdims=False):
return tf.sqrt(var(x, axis=axis, keepdims=keepdims))
def minimize_and_clip(optimizer, objective, var_list, clip_val=10):
"""Minimized `objective` using `optimizer` w.r.t. variables in
`var_list` while ensure the norm of the gradients for each
variable is clipped to `clip_val`
"""
gradients = optimizer.compute_gradients(objective, var_list=var_list)
for i, (grad, var) in enumerate(gradients):
if grad is not None:
gradients[i] = (tf.clip_by_norm(grad, clip_val), var)
return optimizer.apply_gradients(gradients)
def test_output_shape(self):
""" Test that the axis parameter is handled correctly """
stream = [np.random.random((16, 7, 3)) for _ in range(5)]
stack = np.stack(stream, axis = -1)
for axis in (0, 1, 2, None):
with self.subTest('axis = {}'.format(axis)):
from_numpy = np.var(stack, axis = axis)
from_ivar = last(ivar(stream, axis = axis))
self.assertSequenceEqual(from_numpy.shape, from_ivar.shape)
self.assertTrue(np.allclose(from_ivar, from_numpy))
def test_ddof(self):
""" Test that the ddof parameter is equivalent to numpy's """
stream = [np.random.random((16, 7, 3)) for _ in range(10)]
stack = np.stack(stream, axis = -1)
with catch_warnings():
simplefilter('ignore')
for axis in (0, 1, 2, None):
for ddof in range(4):
with self.subTest('axis = {}, ddof = {}'.format(axis, ddof)):
from_numpy = np.var(stack, axis = axis, ddof = ddof)
from_ivar = last(ivar(stream, axis = axis, ddof = ddof))
self.assertSequenceEqual(from_numpy.shape, from_ivar.shape)
self.assertTrue(np.allclose(from_ivar, from_numpy))
def summarize_bootstrapped_top_n(top_n_boot):
top_n_bcs_mean = np.mean(top_n_boot)
top_n_bcs_sd = np.std(top_n_boot)
top_n_bcs_var = np.var(top_n_boot)
result = {}
result['filtered_bcs_var'] = top_n_bcs_var
result['filtered_bcs_cv'] = tk_stats.robust_divide(top_n_bcs_sd, top_n_bcs_mean)
result['filtered_bcs_lb'] = round(scipy.stats.norm.ppf(0.025, top_n_bcs_mean, top_n_bcs_sd))
result['filtered_bcs_ub'] = round(scipy.stats.norm.ppf(0.975, top_n_bcs_mean, top_n_bcs_sd))
result['filtered_bcs'] = round(top_n_bcs_mean)
return result
def auto_correlation_time(x, s, mu, var):
b, t, d = x.shape
act_ = np.zeros([d])
for i in range(0, b):
y = x[i] - mu
p, n = y[:-s], y[s:]
act_ += np.mean(p * n, axis=0) / var
act_ = act_ / b
return act_
def gelman_rubin_diagnostic(x, logger, mu=None):
m, n = x.shape[0], x.shape[1]
theta = np.mean(x, axis=1)
sigma = np.var(x, axis=1)
# theta_m = np.mean(theta, axis=0)
theta_m = mu if mu else np.mean(theta, axis=0)
b = float(n) / float(m-1) * np.sum((theta - theta_m) ** 2)
w = 1. / float(m) * np.sum(sigma, axis=0)
v = float(n-1) / float(n) * w + float(m+1) / float(m * n) * b
r_hat = np.sqrt(v / w)
logger.info('R: max [%f] min [%f]' % (np.max(r_hat), np.min(r_hat)))
return r_hat