def _udpate_item_features(self):
# Gibbs sampling for item features
for item_id in xrange(self.n_item):
indices = self.ratings_csc_[:, item_id].indices
features = self.user_features_[indices, :]
rating = self.ratings_csc_[:, item_id].data - self.mean_rating_
rating = np.reshape(rating, (rating.shape[0], 1))
covar = inv(self.alpha_item +
self.beta * np.dot(features.T, features))
lam = cholesky(covar)
temp = (self.beta * np.dot(features.T, rating) +
np.dot(self.alpha_item, self.mu_item))
mean = np.dot(covar, temp)
temp_feature = mean + np.dot(
lam, self.rand_state.randn(self.n_feature, 1))
self.item_features_[item_id, :] = temp_feature.ravel()
python类cholesky()的实例源码
def _update_user_features(self):
# Gibbs sampling for user features
for user_id in xrange(self.n_user):
indices = self.ratings_csr_[user_id, :].indices
features = self.item_features_[indices, :]
rating = self.ratings_csr_[user_id, :].data - self.mean_rating_
rating = np.reshape(rating, (rating.shape[0], 1))
covar = inv(
self.alpha_user + self.beta * np.dot(features.T, features))
lam = cholesky(covar)
# aplha * sum(V_j * R_ij) + LAMBDA_U * mu_u
temp = (self.beta * np.dot(features.T, rating) +
np.dot(self.alpha_user, self.mu_user))
# mu_i_star
mean = np.dot(covar, temp)
temp_feature = mean + np.dot(
lam, self.rand_state.randn(self.n_feature, 1))
self.user_features_[user_id, :] = temp_feature.ravel()
def solve(A, y, delta, method):
if method == 'ridge_reg_chol':
R = cholesky(dot(A.T, A) + delta*np.identity(A.shape[1]))
z = lstsq(R.T, dot(A.T, y))[0]
x = lstsq(R, z)[0]
elif method == 'ridge_reg_inv':
x = dot(dot(inv(dot(A.T, A) + delta*np.identity(A.shape[1])), A.T), y)
elif method == 'ls_mldivide':
if delta > 0:
print('ignoring lambda; no regularization used')
x = lstsq(A, y)[0]
loss = 0.5 * (dot(A, x) - y) **2
return x.reshape(-1, 1)
def test_lapack_endian(self):
# For bug #1482
a = array([[5.7998084, -2.1825367],
[-2.1825367, 9.85910595]], dtype='>f8')
b = array(a, dtype='<f8')
ap = linalg.cholesky(a)
bp = linalg.cholesky(b)
assert_array_equal(ap, bp)
def test_lapack_endian(self):
# For bug #1482
a = array([[5.7998084, -2.1825367],
[-2.1825367, 9.85910595]], dtype='>f8')
b = array(a, dtype='<f8')
ap = linalg.cholesky(a)
bp = linalg.cholesky(b)
assert_array_equal(ap, bp)
def rand_k(self, k):
"""
Return a random mean vector and covariance matrix from the posterior
NIW distribution for component `k`.
"""
k_N = self.prior.k_0 + self.counts[k]
v_N = self.prior.v_0 + self.counts[k]
m_N = self.m_N_numerators[k]/k_N
S_N = self.S_N_partials[k] - k_N*np.outer(m_N, m_N)
sigma = np.linalg.solve(cholesky(S_N).T, np.eye(self.D)) # don't understand this step
sigma = wishart.iwishrnd(sigma, v_N, sigma)
mu = np.random.multivariate_normal(m_N, sigma/k_N)
return mu, sigma
def _update_item_params(self):
N = self.n_item
X_bar = np.mean(self.item_features_, 0).reshape((self.n_feature, 1))
# print 'X_bar', X_bar.shape
S_bar = np.cov(self.item_features_.T)
# print 'S_bar', S_bar.shape
diff_X_bar = self.mu0_item - X_bar
# W_{0}_star
WI_post = inv(inv(self.WI_item) +
N * S_bar +
np.dot(diff_X_bar, diff_X_bar.T) *
(N * self.beta_item) / (self.beta_item + N))
# Note: WI_post and WI_post.T should be the same.
# Just make sure it is symmertic here
WI_post = (WI_post + WI_post.T) / 2.0
# update alpha_item
df_post = self.df_item + N
self.alpha_item = wishart.rvs(df_post, WI_post, 1, self.rand_state)
# update mu_item
mu_mean = (self.beta_item * self.mu0_item + N * X_bar) / \
(self.beta_item + N)
mu_var = cholesky(inv(np.dot(self.beta_item + N, self.alpha_item)))
# print 'lam', lam.shape
self.mu_item = mu_mean + np.dot(
mu_var, self.rand_state.randn(self.n_feature, 1))
# print 'mu_item', self.mu_item.shape
def _update_user_params(self):
# same as _update_user_params
N = self.n_user
X_bar = np.mean(self.user_features_, 0).reshape((self.n_feature, 1))
S_bar = np.cov(self.user_features_.T)
# mu_{0} - U_bar
diff_X_bar = self.mu0_user - X_bar
# W_{0}_star
WI_post = inv(inv(self.WI_user) +
N * S_bar +
np.dot(diff_X_bar, diff_X_bar.T) *
(N * self.beta_user) / (self.beta_user + N))
# Note: WI_post and WI_post.T should be the same.
# Just make sure it is symmertic here
WI_post = (WI_post + WI_post.T) / 2.0
# update alpha_user
df_post = self.df_user + N
# LAMBDA_{U} ~ W(W{0}_star, df_post)
self.alpha_user = wishart.rvs(df_post, WI_post, 1, self.rand_state)
# update mu_user
# mu_{0}_star = (beta_{0} * mu_{0} + N * U_bar) / (beta_{0} + N)
mu_mean = (self.beta_user * self.mu0_user + N * X_bar) / \
(self.beta_user + N)
# decomposed inv(beta_{0}_star * LAMBDA_{U})
mu_var = cholesky(inv(np.dot(self.beta_user + N, self.alpha_user)))
# sample multivariate gaussian
self.mu_user = mu_mean + np.dot(
mu_var, self.rand_state.randn(self.n_feature, 1))
test_regression.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_lapack_endian(self):
# For bug #1482
a = array([[5.7998084, -2.1825367],
[-2.1825367, 9.85910595]], dtype='>f8')
b = array(a, dtype='<f8')
ap = linalg.cholesky(a)
bp = linalg.cholesky(b)
assert_array_equal(ap, bp)
def test_lapack_endian(self):
# For bug #1482
a = array([[5.7998084, -2.1825367],
[-2.1825367, 9.85910595]], dtype='>f8')
b = array(a, dtype='<f8')
ap = linalg.cholesky(a)
bp = linalg.cholesky(b)
assert_array_equal(ap, bp)
def wishartrand(nu, phi):
dim = phi.shape[0]
chol = cholesky(phi)
foo = np.zeros((dim, dim))
for i in range(dim):
for j in range(i + 1):
if i == j:
foo[i, j] = np.sqrt(chi2.rvs(nu - (i + 1) + 1))
else:
foo[i, j] = np.random.normal(0, 1)
return np.dot(chol, np.dot(foo, np.dot(foo.T, chol.T)))
def mv_normalrand(mu, sigma, size):
lamb = cholesky(sigma)
return mu + np.dot(lamb, np.random.randn(size))
def test_lapack_endian(self):
# For bug #1482
a = array([[5.7998084, -2.1825367],
[-2.1825367, 9.85910595]], dtype='>f8')
b = array(a, dtype='<f8')
ap = linalg.cholesky(a)
bp = linalg.cholesky(b)
assert_array_equal(ap, bp)
def isPD(B):
"""Returns true when input is positive-definite, via Cholesky"""
try:
_ = la.cholesky(B)
return True
except la.LinAlgError:
return False
def test_lapack_endian(self):
# For bug #1482
a = array([[5.7998084, -2.1825367],
[-2.1825367, 9.85910595]], dtype='>f8')
b = array(a, dtype='<f8')
ap = linalg.cholesky(a)
bp = linalg.cholesky(b)
assert_array_equal(ap, bp)
def apply(self, f, mean, cov, pars):
mean = mean[:, na]
# form sigma-points from unit sigma-points
x = mean + cholesky(cov).dot(self.unit_sp)
# push sigma-points through non-linearity
fx = np.apply_along_axis(f, 0, x, pars)
# output mean
mean_f = fx.dot(self.wm)
# output covariance
dfx = fx - mean_f[:, na]
cov_f = dfx.dot(self.Wc).dot(dfx.T)
# input-output covariance
cov_fx = dfx.dot(self.Wc).dot((x - mean).T)
return mean_f, cov_f, cov_fx
def apply(self, f, mean, cov, pars):
# method defined in terms of abstract private functions for computing mean, covariance and cross-covariance
mean = mean[:, na]
x = mean + cholesky(cov).dot(self.unit_sp)
fx = self._fcn_eval(f, x, pars)
mean_f = self._mean(self.wm, fx)
cov_f = self._covariance(self.Wc, fx, mean_f)
cov_fx = self._cross_covariance(self.Wcc, fx, x, mean_f, mean)
return mean_f, cov_f, cov_fx
def rand_k(self, k):
"""
Return a random mean vector and covariance matrix from the posterior
NIW distribution for component `k`.
"""
k_N = self.prior.k_0 + self.counts[k]
v_N = self.prior.v_0 + self.counts[k]
m_N = self.m_N_numerators[k]/k_N
S_N = self.S_N_partials[k] - k_N*np.outer(m_N, m_N)
sigma = np.linalg.solve(cholesky(S_N).T, np.eye(self.D)) # don't understand this step
sigma = wishart.iwishrnd(sigma, v_N, sigma)
mu = np.random.multivariate_normal(m_N, sigma/k_N)
return mu, sigma
def test_lapack_endian(self):
# For bug #1482
a = array([[5.7998084, -2.1825367],
[-2.1825367, 9.85910595]], dtype='>f8')
b = array(a, dtype='<f8')
ap = linalg.cholesky(a)
bp = linalg.cholesky(b)
assert_array_equal(ap, bp)