def variance_values(self, beta):
""" Covariance matrix for the estimated function
Parameters
----------
beta : np.array
Contains untransformed starting values for latent variables
Returns
----------
Covariance matrix for the estimated function
"""
parm = np.array([self.latent_variables.z_list[k].prior.transform(beta[k]) for k in range(beta.shape[0])])
L = self._L(parm)
v = la.cho_solve((L, True), self.kernel.K(parm))
return self.kernel.K(parm) - np.dot(v.T, v)
python类cho_solve()的实例源码
def compute_gradient_totalcverr_wrt_lambda(self,matrix_results,lambda_val,sigmasq_z):
# 0: K_tst_tr; 1: K_tr_tr; 2: D_tst_tr; 3: D_tr_tr
num_sample_cv = self.num_samples
ttl_num_folds = np.shape(matrix_results)[1]
gradient_cverr_per_fold = np.zeros(ttl_num_folds)
for jj in range(ttl_num_folds):
uu = np.shape(matrix_results[3][jj])[0] # number of training samples
M_tst_tr = exp(matrix_results[2][jj]*float(-1/2)*sigmasq_z**(-1))
M_tr_tr = exp(matrix_results[3][jj]*float(-1/2)*sigmasq_z**(-1))
lower_ZZ = cholesky(M_tr_tr+ lambda_val*eye(uu), lower=True)
ZZ = cho_solve((lower_ZZ,True),eye(uu))
first_term = matrix_results[0][jj].dot(ZZ.dot(ZZ.dot(M_tst_tr.T)))
second_term = M_tst_tr.dot(ZZ.dot(ZZ.dot(
matrix_results[1][jj].dot(ZZ.dot(M_tst_tr.T)))))
gradient_cverr_per_fold[jj] = trace(first_term-second_term)
return 2*sum(gradient_cverr_per_fold)/float(num_sample_cv)
# lambda = exp(eta)
def compute_gradient_totalcverr_wrt_sqsigma(self,matrix_results,lambda_val,sigmasq_z):
# 0: K_tst_tr; 1: K_tr_tr; 2: D_tst_tr; 3: D_tr_tr
num_sample_cv = self.num_samples
ttl_num_folds = np.shape(matrix_results)[1]
gradient_cverr_per_fold = np.zeros(ttl_num_folds)
for jj in range(ttl_num_folds):
uu = np.shape(matrix_results[3][jj])[0]
log_M_tr_tst = matrix_results[2][jj].T*float(-1/2)*sigmasq_z**(-1)
M_tr_tst = exp(log_M_tr_tst)
log_M_tr_tr = matrix_results[3][jj]*float(-1/2)*sigmasq_z**(-1)
M_tr_tr = exp(log_M_tr_tr)
lower_ZZ = cholesky(M_tr_tr+ lambda_val*eye(uu), lower=True)
ZZ = cho_solve((lower_ZZ,True),eye(uu))
term_1 = matrix_results[0][jj].dot(ZZ.dot((M_tr_tr*sigmasq_z**(-1)*(-log_M_tr_tr)).dot(ZZ.dot(M_tr_tst))))
term_2 = -matrix_results[0][jj].dot(ZZ.dot(M_tr_tst*(-log_M_tr_tst*sigmasq_z**(-1))))
term_3 = (sigmasq_z**(-1)*(M_tr_tst.T)*(-log_M_tr_tst.T)).dot(ZZ.dot(matrix_results[1][jj].dot(ZZ.dot(M_tr_tst))))
term_4 = -(M_tr_tst.T).dot(ZZ.dot((M_tr_tr*sigmasq_z**(-1)*(-log_M_tr_tr)).dot(ZZ.dot(matrix_results[1][jj].dot(
ZZ.dot(M_tr_tst))))))
term_5 = -(M_tr_tst.T).dot(ZZ.dot(matrix_results[1][jj].dot(ZZ.dot((M_tr_tr*sigmasq_z**(-1)*(-log_M_tr_tr)).dot(
ZZ.dot(M_tr_tst))))))
term_6 = (M_tr_tst.T).dot(ZZ.dot(matrix_results[1][jj].dot(ZZ.dot(M_tr_tst*sigmasq_z**(-1)*(-log_M_tr_tst)))))
gradient_cverr_per_fold[jj] = trace(2*term_1 + 2*term_2 + term_3 + term_4 + term_5 + term_6)
return sum(gradient_cverr_per_fold)/float(num_sample_cv)
def compute_totalcverr(self,matrix_results,lambda_val,sigmasq_z):
# 0: K_tst_tr; 1: K_tr_tr; 2: K_tst_tst; 3: D_tst_tr; 4: D_tr_tr
num_sample_cv = self.num_samples
ttl_num_folds = np.shape(matrix_results)[1]
cverr_per_fold = np.zeros(ttl_num_folds)
for jj in range(ttl_num_folds):
uu = np.shape(matrix_results[4][jj])[0] # number of training samples
M_tst_tr = exp(matrix_results[3][jj]*float(-1/2)*sigmasq_z**(-1))
M_tr_tr = exp(matrix_results[4][jj]*float(-1/2)*sigmasq_z**(-1))
lower_ZZ = cholesky(M_tr_tr+ lambda_val*eye(uu), lower=True)
ZZ = cho_solve((lower_ZZ,True),eye(uu))
first_term = matrix_results[2][jj]
second_term = - matrix_results[0][jj].dot(ZZ.dot(M_tst_tr.T))
third_term = np.transpose(second_term)
fourth_term = M_tst_tr.dot(ZZ.dot(
matrix_results[1][jj].dot(ZZ.dot(M_tst_tr.T))))
cverr_per_fold[jj] = trace(first_term + second_term + third_term + fourth_term)
return sum(cverr_per_fold)/float(num_sample_cv)
def _kalman_correct(x, P, z, H, R, gain_factor, gain_curve):
PHT = np.dot(P, H.T)
S = np.dot(H, PHT) + R
e = z - H.dot(x)
L = cholesky(S, lower=True)
inn = solve_triangular(L, e, lower=True)
if gain_curve is not None:
q = (np.dot(inn, inn) / inn.shape[0]) ** 0.5
f = gain_curve(q)
if f == 0:
return inn
L *= (q / f) ** 0.5
K = cho_solve((L, True), PHT.T, overwrite_b=True).T
if gain_factor is not None:
K *= gain_factor[:, None]
U = -K.dot(H)
U[np.diag_indices_from(U)] += 1
x += K.dot(z - H.dot(x))
P[:] = U.dot(P).dot(U.T) + K.dot(R).dot(K.T)
return inn
def project_onto_nullspace(mom, cache):
""" Projects a momentum on to the nullspace of the constraint Jacobian.
Parameters
----------
mom : vector
Momentum state to project.
cache : dictionary
Dictionary of cached constraint Jacobian results.
Returns
-------
mom : vector
Momentum state projected into nullspace of constraint Jacobian.
"""
dc_dpos = cache['dc_dpos']
if 'gram_chol' not in cache:
cache['gram_chol'] = la.cho_factor(dc_dpos.dot(dc_dpos.T))
gram_chol = cache['gram_chol']
dc_dpos_mom = dc_dpos.dot(mom)
gram_inv_dc_dpos_mom = la.cho_solve(gram_chol, dc_dpos_mom)
dc_dpos_pinv_dc_dpos_mom = dc_dpos.T.dot(gram_inv_dc_dpos_mom)
mom -= dc_dpos_pinv_dc_dpos_mom
return mom
test_unconstrained.py 文件源码
项目:hamiltonian-monte-carlo
作者: matt-graham
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_kinetic_energy(self):
mom_resample_coeff = 1.
dtype = np.float64
for n_dim in [10, 100, 1000]:
mass_matrix = self.prng.normal(size=(n_dim, n_dim))
mass_matrix = mass_matrix.dot(mass_matrix.T)
mass_matrix_chol = la.cholesky(mass_matrix, lower=True)
sampler = uhmc.EuclideanMetricHmcSampler(
energy_func=energy_func,
mass_matrix=mass_matrix,
energy_grad=energy_grad,
prng=self.prng,
mom_resample_coeff=mom_resample_coeff,
dtype=dtype)
pos, mom = self.prng.normal(size=(2, n_dim,)).astype(dtype)
k_energy = sampler.kinetic_energy(pos, mom, {})
assert np.isscalar(k_energy), (
'kinetic_energy returning non-scalar value.'
)
assert np.allclose(
k_energy,
0.5 * mom.dot(la.cho_solve((mass_matrix_chol, True), mom))), (
'kinetic_energy returning incorrect value.'
)
def nci(x, m, P):
# dimension of state, # time steps, # MC simulations, # inference algorithms (filters/smoothers)
d, time, mc_sims, algs = m.shape
dx = x[..., na] - m
# Mean Square Error matrix
MSE = np.empty((d, d, time, mc_sims, algs))
for k in range(time):
for s in range(mc_sims):
for alg in range(algs):
MSE[..., k, s, alg] = np.outer(dx[..., k, s, alg], dx[..., k, s, alg])
MSE = MSE.mean(axis=3) # average over MC simulations
# dx_iP_dx = np.empty((1, time, mc_sims, algs))
NCI = np.empty((1, time, mc_sims, algs))
for k in range(1, time):
for s in range(mc_sims):
for alg in range(algs):
# iP_dx = cho_solve(cho_factor(P[:, :, k, s, alg]), dx[:, k, s, alg])
# dx_iP_dx[:, k, s, alg] = dx[:, k, s, alg].dot(iP_dx)
# iMSE_dx = cho_solve(cho_factor(MSE[..., k, fi]), dx[:, k, s, alg])
# NCI[..., k, s, fi] = 10*np.log10(dx_iP_dx[:, k, s, fi]) - 10*np.log10(dx[:, k, s, alg].dot(iMSE_dx))
dx_iP_dx = dx[:, k, s, alg].dot(np.linalg.inv(P[..., k, s, alg])).dot(dx[:, k, s, alg])
dx_iMSE_dx = dx[:, k, s, alg].dot(np.linalg.inv(MSE[..., k, alg])).dot(dx[:, k, s, alg])
NCI[..., k, s, alg] = 10 * np.log10(dx_iP_dx) - 10 * np.log10(dx_iMSE_dx)
return NCI[:, 1:, ...].mean(axis=1) # average over time steps (ignore the 1st)
def grad_input(self, x):
"""Compute the gradient of the mean function and the standard deviation
function at the provided input.
"""
# Compute the gradient of the mean function.
d_kernel = self.kernel.grad_input(x, self.X)
d_mean = d_kernel.T.dot(self.alpha)
# Compute the gradient of the standard deviation function. It is
# absolutely crucial to note that the predict method returns the
# variance, not the standard deviation, of the prediction.
sd = self.predict(x)[1]
K_cross = self.kernel.cov(x, self.X)
M = spla.cho_solve((self.L, True), K_cross.T).ravel()
d_sd = -d_kernel.T.dot(M) / sd
return d_mean, d_sd
def fit(self, X, y):
"""Fit the Bayesian linear regression model leveraging the available
data.
Parameters:
X (numpy array): A two-dimensional numpy array representing the
matrix of covariates. Note that if a bias term is expressly
desired, it must be included in the design matrix.
y (numpy array): A matrix of target variables to predict.
"""
P = X.T.dot(X) + self.prior_prec
L = spla.cholesky(P, lower=True)
self.mu = spla.cho_solve((L, True), X.T.dot(y))
self.sigma_sq = np.mean((X.dot(self.mu) - y) ** 2)
L_inv = spla.solve_triangular(L.T, np.eye(L.shape[0]))
self.cov = self.sigma_sq * L_inv.dot(L_inv.T)
def chol_inv(B, lower=True):
"""
Returns the inverse of matrix A, where A = B*B.T,
ie B is the Cholesky decomposition of A.
Solves Ax = I
given B is the cholesky factorization of A.
"""
return cho_solve((B, lower), np.eye(B.shape[0]))
def kalman_filter(y, H, R, F, Q, mu, PI, z=None):
"""
Given the following sequences (one item of given dimension for
each time step):
- *y*: measurements (M)
- *H*: measurement operator (MxN)
- *R*: measurement noise covariance (MxM)
- *F*: time update operator (NxN)
- *Q*: time update noise covariance (NxN)
- *mu*: initial state (N)
- *PI*: initial state covariance (NxN)
- *z*: (optional) systematic time update input (N)
Return the :class:`FilterResult` containing lists of posterior
state estimates and error covariances.
"""
x_hat = []
P = []
x_hat_prior = mu
P_prior = PI
if z is None:
z = repeat(None)
for i, (y_i, H_i, R_i, F_i, Q_i, z_i) in enumerate(izip(y, H, R, F, Q, z)):
# measurement update
A = cho_factor(NP.matmul(H_i, NP.matmul(P_prior, H_i.T)) + R_i)
B = cho_solve(A, NP.matmul(H_i, P_prior))
b = cho_solve(A, y_i - NP.matmul(H_i, x_hat_prior))
C = NP.matmul(P_prior, H_i.T)
x_hat.append(x_hat_prior + NP.matmul(C, b))
P.append(P_prior - NP.matmul(C, B))
# time update
x_hat_prior = NP.matmul(F_i, x_hat[-1])
if z_i is not None:
x_hat_prior += z_i
P_prior = NP.matmul(F_i, NP.matmul(P[-1], F_i.T)) + Q_i
return FilterResult(x_hat, P)
def KLdiv(mu0, Lcov0, mu1, Lcov1):
"""Numpy KL calculation."""
tr, dist, ldet = 0., 0., 0.
D, n = mu0.shape
for m0, m1, L0, L1 in zip(mu0, mu1, Lcov0, Lcov1):
tr += np.trace(cho_solve((L1, True), L0.dot(L0.T)))
md = m1 - m0
dist += md.dot(cho_solve((L1, True), md))
ldet += logdet(L1) - logdet(L0)
KL = 0.5 * (tr + dist + ldet - D * n)
return KL
def _alpha(self, L):
""" Covariance-derived term to construct expectations. See Rasmussen & Williams.
Parameters
----------
L : np.ndarray
Cholesky triangular
Returns
----------
np.ndarray (alpha)
"""
return la.cho_solve((L.T, True), la.cho_solve((L, True), np.transpose(self.data)))
def chol2inv(chol):
return spla.cho_solve((chol, False), np.eye(chol.shape[ 0 ]))
def chol2inv(chol):
return spla.cho_solve((chol, False), np.eye(chol.shape[0]))
def __init__(self,gamma,beta,nugget,kernelName,k_lambda,xTrain,yTrain):
"""
Create a new GaussianProcess Object
gamma: Hyperparameter
beta: Hyperparameter
k_lambda: Hyperparameter
nugget: The noise hyperparameter
kernelName: The name of the covariance kernel
xTrain: Numpy array containing x training values
yTrain: Numpy array containing y training values
"""
self.xTrain = xTrain
self.yTrain = yTrain
self.k_lambda = k_lambda
self.beta = beta
self.gamma = gamma
self.nugget = nugget
self.kernelName = kernelName
# Setup the regressor as if gp.fit had been called
# See https://github.com/scikit-learn/scikit-learn/master/sklearn/gaussian_process/gpr.py
kernel = self._getKernel()
gp = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=0)
gp.K = kernel(xTrain);
gp.X_train_ = xTrain;
gp.y_train_ = yTrain;
gp.L_ = cholesky(gp.K, lower=True)
gp.alpha_ = cho_solve((gp.L_, True), yTrain)
gp.fit(xTrain,yTrain)
gp.kernel_ = kernel
self.gp = gp
self.kernel = kernel
# Calculate the matrix inverses once. Save time later
# This is only used for own own implimentation of the scoring engine
self.L_inv = solve_triangular(self.L_.T, np.eye(self.L_.shape[0]))
self.K_inv = L_inv.dot(L_inv.T)
def solve(self, other):
if other.ndim == 1:
Nx = np.array(other / self.N)
elif other.ndim == 2:
Nx = np.array(other / self.N[:,None])
UNx = np.dot(self.U.T, Nx)
Sigma = np.diag(1/self.J) + np.dot(self.U.T, self.U/self.N[:,None])
cf = sl.cho_factor(Sigma)
if UNx.ndim == 1:
tmp = np.dot(self.U, sl.cho_solve(cf, UNx)) / self.N
else:
tmp = np.dot(self.U, sl.cho_solve(cf, UNx)) / self.N[:,None]
return Nx - tmp
def __call__(self, other):
return sl.cho_solve(self.cf, other)
def inv(self):
return sl.cho_solve(self.cf, np.eye(len(self.cf[0])))
def _solve_ZNX(self, X, Z):
"""Solves :math:`Z^T N^{-1}X`, where :math:`X`
and :math:`Z` are 1-d or 2-d arrays.
"""
if X.ndim == 1:
X = X.reshape(X.shape[0], 1)
if Z.ndim == 1:
Z = Z.reshape(Z.shape[0], 1)
n, m = Z.shape[1], X.shape[1]
ZNX = np.zeros((n, m))
if len(self._idx) > 0:
ZNXr = np.dot(Z[self._idx,:].T, X[self._idx,:] /
self._nvec[self._idx, None])
else:
ZNXr = 0
for slc, block in zip(self._slices, self._blocks):
Zblock = Z[slc, :]
Xblock = X[slc, :]
if slc.stop - slc.start > 1:
cf = sl.cho_factor(block+np.diag(self._nvec[slc]))
bx = sl.cho_solve(cf, Xblock)
else:
bx = Xblock / self._nvec[slc][:, None]
ZNX += np.dot(Zblock.T, bx)
ZNX += ZNXr
return ZNX.squeeze() if len(ZNX) > 1 else float(ZNX)
def _solve_NX(self, X):
"""Solves :math:`N^{-1}X`, where :math:`X`
is a 1-d or 2-d array.
"""
if X.ndim == 1:
X = X.reshape(X.shape[0], 1)
NX = X / self._nvec[:,None]
for slc, block in zip(self._slices, self._blocks):
Xblock = X[slc, :]
if slc.stop - slc.start > 1:
cf = sl.cho_factor(block+np.diag(self._nvec[slc]))
NX[slc] = sl.cho_solve(cf, Xblock)
return NX.squeeze()
def solve(self, Y):
return la.cho_solve(self._cholesky, Y)
def compute_gradient_totalcverr_wrt_eta(self,matrix_results,lambda_val,sigmasq_z):
# 0: K_tst_tr; 1: K_tr_tr; 2: D_tst_tr; 3: D_tr_tr
#eta = log(lambda_val)
#gamma = log(sigmasq_z)
num_sample_cv = self.num_samples
ttl_num_folds = np.shape(matrix_results)[1]
gradient_cverr_per_fold = np.zeros(ttl_num_folds)
for jj in range(ttl_num_folds):
uu = np.shape(matrix_results[3][jj])[0] # number of training samples
M_tst_tr = exp(matrix_results[2][jj]*float(-1/2)*sigmasq_z**(-1))
M_tr_tr = exp(matrix_results[3][jj]*float(-1/2)*sigmasq_z**(-1))
lower_ZZ = cholesky(M_tr_tr+ lambda_val*eye(uu), lower=True)
ZZ = cho_solve((lower_ZZ,True),eye(uu))
EE = lambda_val*eye(uu)
first_term = matrix_results[0][jj].dot(ZZ.dot(EE.dot(ZZ.dot(M_tst_tr.T))))
second_term = first_term.T
third_term = -M_tst_tr.dot(ZZ.dot(EE.dot(ZZ.dot(
matrix_results[1][jj].dot(ZZ.dot(M_tst_tr.T))))))
forth_term = -M_tst_tr.dot(ZZ.dot(
matrix_results[1][jj].dot(ZZ.dot(EE.dot(ZZ.dot(M_tst_tr.T))))))
gradient_cverr_per_fold[jj] = trace(first_term + second_term + third_term + forth_term)
return sum(gradient_cverr_per_fold)/float(num_sample_cv)
# compute the gradient of the total cverror with respect to sigma_z squared
def compute_gradient_totalcverr_wrt_gamma(self,matrix_results,lambda_val,sigmasq_z):
# 0: K_tst_tr; 1: K_tr_tr; 2: D_tst_tr; 3: D_tr_tr
#eta = log(lambda_val)
#gamma = log(sigmasq_z)
num_sample_cv = self.num_samples
ttl_num_folds = np.shape(matrix_results)[1]
gradient_cverr_per_fold = np.zeros(ttl_num_folds)
for jj in range(ttl_num_folds):
uu = np.shape(matrix_results[3][jj])[0]
log_M_tr_tst = matrix_results[2][jj].T*float(-1/2)*sigmasq_z**(-1)
M_tr_tst = exp(log_M_tr_tst)
log_M_tr_tr = matrix_results[3][jj]*float(-1/2)*sigmasq_z**(-1)
M_tr_tr = exp(log_M_tr_tr)
lower_ZZ = cholesky(M_tr_tr+ lambda_val*eye(uu), lower=True)
ZZ = cho_solve((lower_ZZ,True),eye(uu))
term_1 = matrix_results[0][jj].dot(ZZ.dot((M_tr_tr*(-log_M_tr_tr)).dot(ZZ.dot(M_tr_tst))))
term_2 = -matrix_results[0][jj].dot(ZZ.dot(M_tr_tst*(-log_M_tr_tst)))
term_3 = (M_tr_tst.T*(-log_M_tr_tst).T).dot(ZZ.dot(matrix_results[1][jj].dot(ZZ.dot(M_tr_tst))))
term_4 = -(M_tr_tst.T).dot(ZZ.dot((M_tr_tr*(-log_M_tr_tr)).dot(ZZ.dot(matrix_results[1][jj].dot(
ZZ.dot(M_tr_tst))))))
term_5 = -(M_tr_tst.T).dot(ZZ.dot(matrix_results[1][jj].dot(ZZ.dot((M_tr_tr*(-log_M_tr_tr)).dot(
ZZ.dot(M_tr_tst))))))
term_6 = (M_tr_tst.T).dot(ZZ.dot(matrix_results[1][jj].dot(ZZ.dot(M_tr_tst*(-log_M_tr_tst)))))
gradient_cverr_per_fold[jj] = trace(2*term_1 + 2*term_2 + term_3 + term_4 + term_5 + term_6)
return sum(gradient_cverr_per_fold)/float(num_sample_cv)
# compute the total cverror
def _rts_pass(x, P, xa, Pa, Phi):
n_points, n_states = x.shape
I = np.identity(n_states)
for i in reversed(range(n_points - 1)):
L = cholesky(Pa[i + 1], check_finite=False)
Pa_inv = cho_solve((L, False), I, check_finite=False)
C = P[i].dot(Phi[i].T).dot(Pa_inv)
x[i] += C.dot(x[i + 1] - xa[i + 1])
P[i] += C.dot(P[i + 1] - Pa[i + 1]).dot(C.T)
return x, P
def kinetic_energy(self, pos, mom, cache={}):
return 0.5 * mom.dot(la.cho_solve(
(self.mass_matrix_chol, True), mom))
def simulate_dynamic(self, n_step, dt, pos, mom, cache={}):
mom = mom - 0.5 * dt * self.energy_grad(pos, cache)
pos = pos + dt * la.cho_solve((self.mass_matrix_chol, True), mom)
for s in range(1, n_step):
mom -= dt * self.energy_grad(pos, cache)
pos += dt * la.cho_solve((self.mass_matrix_chol, True), mom)
mom -= 0.5 * dt * self.energy_grad(pos, cache)
return pos, mom, None
def chol2inv(chol):
return spla.cho_solve((chol, False), np.eye(chol.shape[ 0 ]))
def _measurement_update(self, y):
gain = cho_solve(cho_factor(self.z_cov_pred), self.xz_cov).T
self.x_mean_filt = self.x_mean_pred + gain.dot(y - self.z_mean_pred)
self.x_cov_filt = self.x_cov_pred - gain.dot(self.z_cov_pred).dot(gain.T)