def _init_coefs(X, method='corrcoef'):
if method == 'corrcoef':
return np.corrcoef(X, rowvar=False), 1.0
elif method == 'cov':
init_cov = np.cov(X, rowvar=False)
return init_cov, np.max(np.abs(np.triu(init_cov)))
elif method == 'spearman':
return spearman_correlation(X, rowvar=False), 1.0
elif method == 'kendalltau':
return kendalltau_correlation(X, rowvar=False), 1.0
elif callable(method):
return method(X)
else:
raise ValueError(
("initialize_method must be 'corrcoef' or 'cov', "
"passed \'{}\' .".format(method))
)
python类cov()的实例源码
def PCA(data, num_components=None):
# mean center the data
data -= data.mean(axis=0)
# calculate the covariance matrix
R = np.cov(data, rowvar=False)
# calculate eigenvectors & eigenvalues of the covariance matrix
# use 'eigh' rather than 'eig' since R is symmetric,
# the performance gain is substantial
V, E = np.linalg.eigh(R)
# sort eigenvalue in decreasing order
idx = np.argsort(V)[::-1]
E = E[:,idx]
# sort eigenvectors according to same index
V = V[idx]
# select the first n eigenvectors (n is desired dimension
# of rescaled data array, or dims_rescaled_data)
E = E[:, :num_components]
# carry out the transformation on the data using eigenvectors
# and return the re-scaled data, eigenvalues, and eigenvectors
return np.dot(E.T, data.T).T, V, E
def calculate_beta(self):
"""
.. math::
\\beta_a = \\frac{\mathrm{Cov}(r_a,r_p)}{\mathrm{Var}(r_p)}
http://en.wikipedia.org/wiki/Beta_(finance)
"""
# it doesn't make much sense to calculate beta for less than two
# values, so return none.
if len(self.algorithm_returns) < 2:
return 0.0
returns_matrix = np.vstack([self.algorithm_returns,
self.benchmark_returns])
C = np.cov(returns_matrix, ddof=1)
algorithm_covariance = C[0][1]
benchmark_variance = C[1][1]
beta = algorithm_covariance / benchmark_variance
return beta
def stats2(sarray, names=None):
"""Calculate means and (co)variances for structured array data."""
if names is None:
names = sarray.dtype.names
nvar = len(names)
data = tuple(sarray[name] for name in names)
cov = np.cov(data)
nondiag_cov = list(cov[i, j] for i, j in permutations(range(nvar), 2))
names_ave = list('ave_' + name for name in names)
names_var = list('var_' + name for name in names)
names_cov = list(
'cov_' + n1 + "_" + n2 for n1, n2 in permutations(names, 2))
out = dict(zip(names_ave, np.mean(data, axis=1)))
out.update(zip(names_var, cov.diagonal()))
out.update(zip(names_cov, nondiag_cov))
NamedStats = namedtuple('Stats2', names_ave + names_var + names_cov)
return NamedStats(**out)
def eval_F(list_data, worker, mu, C, W):
"""
F = log N(T, mu, C) + SUM q() log Ber(L_ij | T)
list_il can be from different datasets:
list_il = list: each elem is a list from a dataset
"""
res = scipy.stats.multivariate_normal.logpdf(W, mean = mu, cov = C, allow_singular = True)
for dt, data in enumerate(list_data):
# dataset dt: sen = W[2*dt], fpr = W[2*dt+1]
sen, fpr = W[2*dt], W[2*dt+1]
if worker in data.dic_wl:
for (item, label) in data.dic_wl[worker]:
# prob of being 1
qz = data.qz[item]
res += qz * np.log( Ber(S(sen), label) )
#res += qz * ( label*np.log(S(sen)) + (1-label)*np.log(S(sen)) )
res += (1-qz) * np.log( Ber(S(fpr), label) )
return res
def expectation_binorm(rv, mu, var, x, M, C, w = 3):
"""
Evaluate the expectation of log Norm(uv| M, C)
x = u, v ~ Norm(mu, var) rv == 'v'
x = v, u ~ Norm(mu, var) rv == 'u'
"""
#print rv, mu, var, x, M, C
if rv == 'v':
f = lambda v: scipy.stats.norm.pdf(v, loc = mu, scale = np.sqrt(var) ) * \
np.log(scipy.stats.multivariate_normal.pdf([x, v], mean = M, cov = C, allow_singular = True))
else:
f = lambda u: scipy.stats.norm.pdf(u, loc = mu, scale = np.sqrt(var) ) * \
np.log(scipy.stats.multivariate_normal.pdf([u, x], mean = M, cov = C, allow_singular = True))
#return f
#print f(mu)
std = np.sqrt(var)
return scipy.integrate.quad(f, mu - w*std, mu + w*std)[0]
def measure_correlation(gold0, gold1, l = 10, pos = 0, list_w = []):
a = []
b = []
for w in gold0.keys():
if w in gold1 and (w in list_w or list_w == []):
if gold0[w][0] != None and gold1[w][0] != None:
if gold0[w][2] > l and gold1[w][2] > l:
if pos == 0:
a.append(logit(gold0[w][pos]))
b.append(logit(gold1[w][pos]))
else:
a.append(logit(1 - gold0[w][pos]))
b.append(logit(1 - gold1[w][pos]))
print len(a), np.mean(a), np.mean(b)
return np.cov(a,b)
def mahalanobis_distance(difference, num_random_features):
num_samples, _ = np.shape(difference)
sigma = np.cov(np.transpose(difference))
mu = np.mean(difference, 0)
if num_random_features == 1:
stat = float(num_samples * mu ** 2) / float(sigma)
else:
try:
linalg.inv(sigma)
except LinAlgError:
print('covariance matrix is singular. Pvalue returned is 1.1')
warnings.warn('covariance matrix is singular. Pvalue returned is 1.1')
return 0
stat = num_samples * mu.dot(linalg.solve(sigma, np.transpose(mu)))
return chi2.sf(stat, num_random_features)
def heritability(parents, offspring):
"""
Compute the heritability from parent and offspring samples.
Parameters
----------
parents : array_like
Array of data for trait of parents.
offspring : array_like
Array of data for trait of offspring.
Returns
-------
output : float
Heritability of trait.
"""
par, off = _convert_two_data(parents, offspring)
covariance_matrix = np.cov(par, off)
return covariance_matrix[0,1] / covariance_matrix[0,0]
def heritability(parents, offspring):
"""
Compute the heritability from parent and offspring samples.
Parameters
----------
parents : array_like
Array of data for trait of parents.
offspring : array_like
Array of data for trait of offspring.
Returns
-------
output : float
Heritability of trait.
"""
par, off = _convert_two_data(parents, offspring)
covariance_matrix = np.cov(par, off)
return covariance_matrix[0,1] / covariance_matrix[0,0]
def test_1d_w_missing(self):
# Test cov 1 1D variable w/missing values
x = self.data
x[-1] = masked
x -= x.mean()
nx = x.compressed()
assert_almost_equal(np.cov(nx), cov(x))
assert_almost_equal(np.cov(nx, rowvar=False), cov(x, rowvar=False))
assert_almost_equal(np.cov(nx, rowvar=False, bias=True),
cov(x, rowvar=False, bias=True))
#
try:
cov(x, allow_masked=False)
except ValueError:
pass
#
# 2 1D variables w/ missing values
nx = x[1:-1]
assert_almost_equal(np.cov(nx, nx[::-1]), cov(x, x[::-1]))
assert_almost_equal(np.cov(nx, nx[::-1], rowvar=False),
cov(x, x[::-1], rowvar=False))
assert_almost_equal(np.cov(nx, nx[::-1], rowvar=False, bias=True),
cov(x, x[::-1], rowvar=False, bias=True))
def test_2d_w_missing(self):
# Test cov on 2D variable w/ missing value
x = self.data
x[-1] = masked
x = x.reshape(3, 4)
valid = np.logical_not(getmaskarray(x)).astype(int)
frac = np.dot(valid, valid.T)
xf = (x - x.mean(1)[:, None]).filled(0)
assert_almost_equal(cov(x),
np.cov(xf) * (x.shape[1] - 1) / (frac - 1.))
assert_almost_equal(cov(x, bias=True),
np.cov(xf, bias=True) * x.shape[1] / frac)
frac = np.dot(valid.T, valid)
xf = (x - x.mean(0)).filled(0)
assert_almost_equal(cov(x, rowvar=False),
(np.cov(xf, rowvar=False) *
(x.shape[0] - 1) / (frac - 1.)))
assert_almost_equal(cov(x, rowvar=False, bias=True),
(np.cov(xf, rowvar=False, bias=True) *
x.shape[0] / frac))
def test_shape_inference(self):
with self.test_session(use_gpu=True):
# Static
mean = 10 * np.random.normal(size=(10, 11, 2)).astype('d')
cov = np.zeros((10, 11, 2, 2))
dst = MultivariateNormalCholesky(
tf.constant(mean), tf.constant(cov))
self.assertEqual(dst.get_batch_shape().as_list(), [10, 11])
self.assertEqual(dst.get_value_shape().as_list(), [2])
# Dynamic
unk_mean = tf.placeholder(tf.float32, None)
unk_cov = tf.placeholder(tf.float32, None)
dst = MultivariateNormalCholesky(unk_mean, unk_cov)
self.assertEqual(dst.get_value_shape().as_list(), [None])
feed_dict = {unk_mean: np.ones(2), unk_cov: np.eye(2)}
self.assertEqual(list(dst.batch_shape.eval(feed_dict)), [])
self.assertEqual(list(dst.value_shape.eval(feed_dict)), [2])
def test_sample(self):
with self.fixed_randomness_session(233):
def test_sample_with(seed):
mean, cov, cov_chol = self._gen_test_params(seed)
dst = MultivariateNormalCholesky(
tf.constant(mean), tf.constant(cov_chol))
n_exp = 20000
samples = dst.sample(n_exp)
sample_shape = (n_exp, 10, 11, 3)
self.assertEqual(samples.shape.as_list(), list(sample_shape))
samples = dst.sample(n_exp).eval()
self.assertEqual(samples.shape, sample_shape)
self.assertAllClose(
np.mean(samples, axis=0), mean, rtol=5e-2, atol=5e-2)
for i in range(10):
for j in range(11):
self.assertAllClose(
np.cov(samples[:, i, j, :].T), cov[i, j],
rtol=1e-1, atol=1e-1)
for seed in [23, 233, 2333]:
test_sample_with(seed)
def test_prob(self):
with self.fixed_randomness_session(233):
def test_prob_with(seed):
mean, cov, cov_chol = self._gen_test_params(seed)
dst = MultivariateNormalCholesky(
tf.constant(mean), tf.constant(cov_chol),
check_numerics=True)
n_exp = 200
samples = dst.sample(n_exp).eval()
log_pdf = dst.log_prob(tf.constant(samples))
pdf_shape = (n_exp, 10, 11)
self.assertEqual(log_pdf.shape.as_list(), list(pdf_shape))
log_pdf = log_pdf.eval()
self.assertEqual(log_pdf.shape, pdf_shape)
for i in range(10):
for j in range(11):
log_pdf_exact = stats.multivariate_normal.logpdf(
samples[:, i, j, :], mean[i, j], cov[i, j])
self.assertAllClose(
log_pdf_exact, log_pdf[:, i, j])
self.assertAllClose(
np.exp(log_pdf), dst.prob(tf.constant(samples)).eval())
for seed in [23, 233, 2333]:
test_prob_with(seed)
def get_neg_log_post(Phi, sigma_J_list, ROI_list, G, MMT, q, Sigma_E, GL,
nu, V, prior_on = False):
eps = 1E-13
p = Phi.shape[0]
n_ROI = len(sigma_J_list)
Qu = Phi.dot(Phi.T)
G_Sigma_G = np.zeros(MMT.shape)
for i in range(n_ROI):
G_Sigma_G += sigma_J_list[i]**2 * np.dot(G[:,ROI_list[i]], G[:,ROI_list[i]].T)
cov = Sigma_E + G_Sigma_G + GL.dot(Qu).dot(GL.T)
inv_cov = np.linalg.inv(cov)
eigs = np.real(np.linalg.eigvals(cov)) + eps
log_det_cov = np.sum(np.log(eigs))
result = q*log_det_cov + np.trace(MMT.dot(inv_cov))
if prior_on:
inv_Q = np.linalg.inv(Qu)
#det_Q = np.linalg.det(Qu)
log_det_Q = np.sum(np.log(np.diag(Phi)**2))
result = result + np.float(nu+p+1)*log_det_Q+ np.trace(V.dot(inv_Q))
return result
#==============================================================================
# update both Qu and Sigma_J, gradient of Qu and Sigma J
def get_neg_log_post(Phi, sigma_J_list, ROI_list, G, MMT, q, Sigma_E, GL,
nu, V, prior_on = False):
eps = 1E-13
p = Phi.shape[0]
n_ROI = len(sigma_J_list)
Qu = Phi.dot(Phi.T)
G_Sigma_G = np.zeros(MMT.shape)
for i in range(n_ROI):
G_Sigma_G += sigma_J_list[i]**2 * np.dot(G[:,ROI_list[i]], G[:,ROI_list[i]].T)
cov = Sigma_E + G_Sigma_G + GL.dot(Qu).dot(GL.T)
inv_cov = np.linalg.inv(cov)
eigs = np.real(np.linalg.eigvals(cov)) + eps
log_det_cov = np.sum(np.log(eigs))
result = q*log_det_cov + np.trace(MMT.dot(inv_cov))
if prior_on:
inv_Q = np.linalg.inv(Qu)
#det_Q = np.linalg.det(Qu)
log_det_Q = np.sum(np.log(np.diag(Phi)**2))
result = result + np.float(nu+p+1)*log_det_Q+ np.trace(V.dot(inv_Q))
return result
#==============================================================================
# update both Qu and Sigma_J, gradient of Qu and Sigma J
def plot(self, data, size, newdata=None):
data = np.array(data)
numsample = len(data)
colmean = np.mean(data, axis=0)
matcov = np.cov(data.T)
matinv = np.linalg.inv(matcov)
values = []
for sample in data:
dif = sample - colmean
value = matinv.dot(dif.T).dot(dif)
values.append(value)
cl = ((numsample - 1)**2) / numsample
lcl = cl * beta.ppf(0.00135, size / 2, (numsample - size - 1) / 2)
center = cl * beta.ppf(0.5, size / 2, (numsample - size - 1) / 2)
ucl = cl * beta.ppf(0.99865, size / 2, (numsample - size - 1) / 2)
return (values, center, lcl, ucl, self._title)
def calc_mean_var_loss(epochsInds,loss_train):
#Loss train is in dimension # epochs X #batchs
num_of_epochs = loss_train.shape[0]
#Average over the batchs
loss_train_mean = np.mean(loss_train,1)
#The diff divided by the sampled indexes
d_mean_loss_to_dt = np.sqrt(np.abs(np.diff(loss_train_mean) / np.diff(epochsInds[:])))
var_loss = []
#Go over the epochs
for epoch_index in range(num_of_epochs):
#The loss for the specpic epoch
current_loss = loss_train[epoch_index, :]
#The derivative between the batchs
current_loss_dt = np.diff(current_loss)
#The mean of his derivative
average_loss = np.mean(current_loss_dt)
current_loss_minus_mean = current_loss_dt- average_loss
#The covarince between the batchs
cov_mat = np.dot(current_loss_minus_mean[:, None], current_loss_minus_mean[None, :])
# The trace of the cov matrix
trac_cov = np.trace(cov_mat)
var_loss.append(trac_cov)
return np.array(var_loss), d_mean_loss_to_dt
def match_color_histogram(x, y):
z = np.zeros_like(x)
shape = x[0].shape
for i in six.moves.range(len(x)):
a = x[i].reshape((3, -1))
a_mean = np.mean(a, axis=1, keepdims=True)
a_var = np.cov(a)
d, v = np.linalg.eig(a_var)
d += 1e-6
a_sigma_inv = v.dot(np.diag(d ** (-0.5))).dot(v.T)
b = y[i].reshape((3, -1))
b_mean = np.mean(b, axis=1, keepdims=True)
b_var = np.cov(b)
d, v = np.linalg.eig(b_var)
b_sigma = v.dot(np.diag(d ** 0.5)).dot(v.T)
transform = b_sigma.dot(a_sigma_inv)
z[i,:] = (transform.dot(a - a_mean) + b_mean).reshape(shape)
return z
def compute_cmus_scov(self, data, labels):
""" Compute class means and shared covariance
(weighted within-class covariance) """
self.label_map, class_sizes = np.unique(labels, return_counts=True)
dim = data.shape[1]
cmus = np.zeros(shape=(len(class_sizes), dim))
acc = np.zeros(shape=(dim, dim))
scov = np.zeros_like(acc)
gl_mu = np.mean(data, axis=0).reshape(-1, 1)
gl_cov = np.cov(data.T, bias=True)
for i, k in enumerate(self.label_map):
data_k = data[np.where(labels == k)[0], :]
mu_k = np.mean(data_k, axis=0).reshape(-1, 1)
cmus[i, :] = mu_k[:, 0]
acc += (gl_mu - mu_k).dot((gl_mu - mu_k).T) * data_k.shape[0]
acc /= data.shape[0]
scov = gl_cov - acc
return cmus, scov, class_sizes
def get_major_minor_axis(img):
"""
Finds the major and minor axis as 3d vectors of the passed in image
:param img: CZYX numpy array
:return: tuple containing two numpy arrays representing the major and minor axis as 3d vectors
"""
# do a mean projection if more than 3 axes
if img.ndim > 3:
z, y, x = np.nonzero(np.mean(img, axis=tuple(range(img.ndim - 3))))
else:
z, y, x = np.nonzero(img)
coords = np.stack([x - np.mean(x), y - np.mean(y), z - np.mean(z)])
# eigenvectors and values of the covariance matrix
evals, evecs = np.linalg.eig(np.cov(coords))
# return largest and smallest eigenvectors (major and minor axis)
order = np.argsort(evals)
return (evecs[:, order[-1]], evecs[:, order[0]])
def posterior_cov(self):
"""
Computes posterior covariance from the samples drawn from posterior distribution
Returns
-------
np.ndarray
posterior covariance
"""
endp = len(self.parameters) - 1
endw = len(self.weights) - 1
params = self.parameters[endp]
weights = self.weights[endw]
return np.cov(np.transpose(params), aweights = weights.reshape(len(weights),))
def test_1d_w_missing(self):
# Test cov 1 1D variable w/missing values
x = self.data
x[-1] = masked
x -= x.mean()
nx = x.compressed()
assert_almost_equal(np.cov(nx), cov(x))
assert_almost_equal(np.cov(nx, rowvar=False), cov(x, rowvar=False))
assert_almost_equal(np.cov(nx, rowvar=False, bias=True),
cov(x, rowvar=False, bias=True))
#
try:
cov(x, allow_masked=False)
except ValueError:
pass
#
# 2 1D variables w/ missing values
nx = x[1:-1]
assert_almost_equal(np.cov(nx, nx[::-1]), cov(x, x[::-1]))
assert_almost_equal(np.cov(nx, nx[::-1], rowvar=False),
cov(x, x[::-1], rowvar=False))
assert_almost_equal(np.cov(nx, nx[::-1], rowvar=False, bias=True),
cov(x, x[::-1], rowvar=False, bias=True))
def test_2d_w_missing(self):
# Test cov on 2D variable w/ missing value
x = self.data
x[-1] = masked
x = x.reshape(3, 4)
valid = np.logical_not(getmaskarray(x)).astype(int)
frac = np.dot(valid, valid.T)
xf = (x - x.mean(1)[:, None]).filled(0)
assert_almost_equal(cov(x),
np.cov(xf) * (x.shape[1] - 1) / (frac - 1.))
assert_almost_equal(cov(x, bias=True),
np.cov(xf, bias=True) * x.shape[1] / frac)
frac = np.dot(valid.T, valid)
xf = (x - x.mean(0)).filled(0)
assert_almost_equal(cov(x, rowvar=False),
(np.cov(xf, rowvar=False) *
(x.shape[0] - 1) / (frac - 1.)))
assert_almost_equal(cov(x, rowvar=False, bias=True),
(np.cov(xf, rowvar=False, bias=True) *
x.shape[0] / frac))
def __init__(self, score_metric='log_likelihood', init_method='cov',
auto_scale=True):
self.score_metric = score_metric
self.init_method = init_method
self.auto_scale = auto_scale
self.covariance_ = None # assumes a matrix of a list of matrices
self.precision_ = None # assumes a matrix of a list of matrices
# these must be updated upon self.fit()
# the first 4 will be set if self.init_coefs is used.
# self.sample_covariance_
# self.lam_scale_
# self.n_samples
# self.n_features
self.is_fitted = False
super(InverseCovarianceEstimator, self).__init__()
def _maximization(self):
# Maximize priors
priors = sum(self.responsibility_matrix)
priors = [float(i)/sum(priors) for i in priors]
# Maximize means
mus = [0 for i in range(self.c)]
for k in range(self.c):
mus_k = sum(np.multiply(self.samples,
self.responsibility_matrix[:, k][:, np.newaxis]))
normalized_mus_k = mus_k / sum(self.responsibility_matrix[:, k])
mus[k] = normalized_mus_k
# Maximize covariances
covs = [0 for i in range(self.c)]
for k in range(self.c):
covs[k] = np.cov(self.samples.T,
aweights=self.responsibility_matrix[:, k])
return priors, mus, covs
def test_plot_error_ellipse(self):
# Generate random data
x = np.random.normal(0, 1, 300)
s = np.array([2.0, 2.0])
y1 = np.random.normal(s[0] * x)
y2 = np.random.normal(s[1] * x)
data = np.array([y1, y2])
# Calculate covariance and plot error ellipse
cov = np.cov(data)
plot_error_ellipse([0.0, 0.0], cov)
debug = False
if debug:
plt.scatter(data[0, :], data[1, :])
plt.xlim([-8, 8])
plt.ylim([-8, 8])
plt.show()
plt.clf()
def test_mean_cov(self):
with self.test_context():
self.m.compile()
num_samples = 1000
samples = self.hmc.sample(self.m, num_samples=num_samples,
lmin=10, lmax=21, epsilon=0.05)
self.assertEqual(samples.shape, (num_samples, 2))
xs = np.array(samples[self.m.x.full_name].tolist(), dtype=np.float32)
mean = xs.mean(0)
cov = np.cov(xs.T)
cov_standard = np.eye(cov.shape[0])
# TODO(@awav): inspite of the fact that we set up graph's random seed,
# the operation seed is still assigned by tensorflow automatically
# and hence sample output numbers are not deterministic.
#
# self.assertTrue(np.sum(np.abs(mean) < 0.1) >= mean.size/2)
# assert_allclose(cov, cov_standard, rtol=1e-1, atol=1e-1)
def _init(self, X, lengths=None):
super(GaussianHMM, self)._init(X, lengths=lengths)
_, n_features = X.shape
if hasattr(self, 'n_features') and self.n_features != n_features:
raise ValueError('Unexpected number of dimensions, got %s but '
'expected %s' % (n_features, self.n_features))
self.n_features = n_features
if 'm' in self.init_params or not hasattr(self, "means_"):
kmeans = cluster.KMeans(n_clusters=self.n_components,
random_state=self.random_state)
kmeans.fit(X)
self.means_ = kmeans.cluster_centers_
if 'c' in self.init_params or not hasattr(self, "covars_"):
cv = np.cov(X.T) + self.min_covar * np.eye(X.shape[1])
if not cv.shape:
cv.shape = (1, 1)
self._covars_ = distribute_covar_matrix_to_match_covariance_type(
cv, self.covariance_type, self.n_components).copy()