def _smoothing_update(self):
gain = cho_solve(cho_factor(self.x_cov_pred), self.xx_cov).T
self.x_mean_smooth = self.x_mean_filt + gain.dot(self.x_mean_smooth - self.x_mean_pred)
self.x_cov_smooth = self.x_cov_filt + gain.dot(self.x_cov_smooth - self.x_cov_pred).dot(gain.T)
python类cho_solve()的实例源码
def weights_rbf(self, unit_sp, hypers):
# BQ weights for RBF kernel with given hypers, computations adopted from the GP-ADF code [Deisenroth] with
# the following assumptions:
# (A1) the uncertain input is zero-mean with unit covariance
# (A2) one set of hyper-parameters is used for all output dimensions (one GP models all outputs)
d, n = unit_sp.shape
# GP kernel hyper-parameters
alpha, el, jitter = hypers['sig_var'], hypers['lengthscale'], hypers['noise_var']
assert len(el) == d
# pre-allocation for convenience
eye_d, eye_n = np.eye(d), np.eye(n)
iLam1 = np.atleast_2d(np.diag(el ** -1)) # sqrt(Lambda^-1)
iLam2 = np.atleast_2d(np.diag(el ** -2))
inp = unit_sp.T.dot(iLam1) # sigmas / el[:, na] (x - m)^T*sqrt(Lambda^-1) # (numSP, xdim)
K = np.exp(2 * np.log(alpha) - 0.5 * maha(inp, inp))
iK = cho_solve(cho_factor(K + jitter * eye_n), eye_n)
B = iLam2 + eye_d # (D, D)
c = alpha ** 2 / np.sqrt(det(B))
t = inp.dot(inv(B)) # inn*(P + Lambda)^-1
l = np.exp(-0.5 * np.sum(inp * t, 1)) # (N, 1)
zet = 2 * np.log(alpha) - 0.5 * np.sum(inp * inp, 1)
inp = inp.dot(iLam1)
R = 2 * iLam2 + eye_d
t = 1 / np.sqrt(det(R))
L = np.exp((zet[:, na] + zet[:, na].T) + maha(inp, -inp, V=0.5 * inv(R)))
q = c * l # evaluations of the kernel mean map (from the viewpoint of RHKS methods)
# mean weights
wm_f = q.dot(iK)
iKQ = iK.dot(t * L)
# covariance weights
wc_f = iKQ.dot(iK)
# cross-covariance "weights"
wc_fx = np.diag(q).dot(iK)
# used for self.D.dot(x - mean).dot(wc_fx).dot(fx)
self.D = inv(eye_d + np.diag(el ** 2)) # S(S+Lam)^-1; for S=I, (I+Lam)^-1
# model variance; to be added to the covariance
# this diagonal form assumes independent GP outputs (cov(f^a, f^b) = 0 for all a, b: a neq b)
self.model_var = np.diag((alpha ** 2 - np.trace(iKQ)) * np.ones((d, 1)))
return wm_f, wc_f, wc_fx
def plot_gp_model(self, f, unit_sp, args, test_range=(-5, 5, 50), plot_dims=(0, 0)):
# plot out_dim vs. in_dim
in_dim, out_dim = plot_dims
# test input must have the same dimension as specified in kernel
test = np.linspace(*test_range)
test_pts = np.zeros((self.d, len(test)))
test_pts[in_dim, :] = test
# function value observations at training points (unit sigma-points)
y = np.apply_along_axis(f, 0, unit_sp, args)
fx = np.apply_along_axis(f, 0, test_pts, args) # function values at test points
K = self.kern.K(unit_sp.T) # covariances between sigma-points
k = self.kern.K(test_pts.T, unit_sp.T) # covariance between test inputs and sigma-points
kxx = self.kern.Kdiag(test_pts.T) # prior predictive variance
k_iK = cho_solve(cho_factor(K), k.T).T
gp_mean = k_iK.dot(y[out_dim, :]) # GP mean
gp_var = np.diag(np.diag(kxx) - k_iK.dot(k.T)) # GP predictive variance
# plot the GP mean, predictive variance and the true function
plt.figure()
plt.plot(test, fx[out_dim, :], color='r', ls='--', lw=2, label='true')
plt.plot(test, gp_mean, color='b', ls='-', lw=2, label='GP mean')
plt.fill_between(test, gp_mean + 2 * np.sqrt(gp_var), gp_mean - 2 * np.sqrt(gp_var),
color='b', alpha=0.25, label='GP variance')
plt.plot(unit_sp[in_dim, :], y[out_dim, :],
color='k', ls='', marker='o', ms=8, label='data')
plt.legend()
plt.show()
def _pull_from_cache_or_compute(self):
if self.caching and len(self._cache_list) == self.num_states:
chol = self._cache_list[self.state]['chol']
alpha = self._cache_list[self.state]['alpha']
else:
chol = spla.cholesky(self.kernel.cov(self.inputs), lower=True)
alpha = spla.cho_solve((chol, True), self.values - self.mean.value)
return chol, alpha
def _prepare_cache(self):
inputs_hash = hash(self.inputs.tostring())
for i in xrange(self.num_states):
self.set_state(i)
chol = spla.cholesky(self.kernel.cov(self.inputs), lower=True)
alpha = spla.cho_solve((chol, True), self.values - self.mean.value)
cache_dict = {
'chol' : chol,
'alpha' : alpha
}
self._cache_list.append(cache_dict)
def log_likelihood(self):
"""
GP Marginal likelihood
Notes
-----
This is called by the samplers when fitting the hyperparameters.
"""
cov = self.kernel.cov(self.observed_inputs)
chol = spla.cholesky(cov, lower=True)
solve = spla.cho_solve((chol, True), self.observed_values - self.mean.value)
# Uses the identity that log det A = log prod diag chol A = sum log diag chol A
return -np.sum(np.log(np.diag(chol)))-0.5*np.dot(self.observed_values - self.mean.value, solve)
def _pull_from_cache_or_compute(self):
if self.caching and len(self._cache_list) == self.num_states:
chol = self._cache_list[self.state]['chol']
alpha = self._cache_list[self.state]['alpha']
else:
chol = spla.cholesky(self.kernel.cov(self.inputs), lower=True)
alpha = spla.cho_solve((chol, True), self.values - self.mean.value)
return chol, alpha
def log_likelihood(self):
"""
GP Marginal likelihood
Notes
-----
This is called by the samplers when fitting the hyperparameters.
"""
cov = self.kernel.cov(self.observed_inputs)
chol = spla.cholesky(cov, lower=True)
solve = spla.cho_solve((chol, True), self.observed_values - self.mean.value)
# Uses the identity that log det A = log prod diag chol A = sum log diag chol A
return -np.sum(np.log(np.diag(chol)))-0.5*np.dot(self.observed_values - self.mean.value, solve)
def fit(self, X, y):
"""Fit the parameters of the probabilistic process based on the
available training data.
"""
# Store the training data (both the inputs and the targets).
self.X, self.y = X, y
# Compute the covariance matrix of the observed inputs.
K = self.kernel.cov(self.X)
# For a numerically stable algorithm, we use Cholesky decomposition.
self.L = spla.cholesky(K, lower=True)
self.alpha = spla.cho_solve((self.L, True), self.y).ravel()
L_inv = spla.solve_triangular(self.L.T, np.eye(self.L.shape[0]))
self.K_inv = L_inv.dot(L_inv.T)
self.beta = self.y.dot(self.alpha)
def _construct_predict(self, beta, h):
""" Creates h-step ahead forecasts for the Gaussian process
Parameters
----------
beta : np.array
Contains untransformed starting values for the latent variables
h: int
How many steps ahead to forecast
Returns
----------
- predictions
- variance of predictions
"""
# Refactor this entire code in future
parm = np.array([self.latent_variables.z_list[k].prior.transform(beta[k]) for k in range(beta.shape[0])])
Xstart = self.X().copy()
Xstart = [i for i in Xstart]
predictions = np.zeros(h)
variances = np.zeros(h)
for step in range(0,h):
Xstar = []
for lag in range(0,self.max_lag):
if lag == 0:
if step == 0:
Xstar.append([self.data[-1]])
Xstart[0] = np.append(Xstart[0],self.data[-1])
else:
Xstar.append([predictions[step-1]])
Xstart[0] = np.append(Xstart[0],predictions[step-1])
else:
Xstar.append([Xstart[lag-1][-2]])
Xstart[lag] = np.append(Xstart[lag],Xstart[lag-1][-2])
Kstar = self.kernel.Kstar(parm, np.transpose(np.array(Xstar)))
L = self._L(parm)
alpha = self._alpha(L)
predictions[step] = np.dot(np.transpose(Kstar), alpha)
v = la.cho_solve((L, True), Kstar)
variances[step] = self.kernel.Kstarstar(parm, np.transpose(np.array(Xstar))) - np.dot(v.T, v)
return predictions, variances, predictions - 1.98*np.power(variances,0.5), predictions + 1.98*np.power(variances,0.5)
def evaluate(self, theta_new, t=None):
"""Evaluate the acquisition function at the location theta_new.
Parameters
----------
theta_new : array_like
Evaluation coordinates.
t : int, optional
Current iteration, (unused).
Returns
-------
array_like
Expected loss's term dependent on theta_new.
"""
gp = self.model
n_imp, n_dim = self.points_int.shape
# Alter the shape of theta_new.
if n_dim != 1 and theta_new.ndim == 1:
theta_new = theta_new[np.newaxis, :]
elif n_dim == 1 and theta_new.ndim == 1:
theta_new = theta_new[:, np.newaxis]
# Calculate the integrand term w.
# Note: w's second term (given in Järvenpää et al., 2017) is dismissed
# because it is constant with respect to theta_new.
_, var_new = gp.predict(theta_new, noiseless=True)
k_old_new = self._K(self.thetas_old, theta_new)
k_int_new = self._K(self.points_int, theta_new).T
# Using the Cholesky factorisation to avoid computing matrix inverse.
term_chol = sl.cho_solve(sl.cho_factor(self.K), k_old_new)
cov_int = k_int_new - np.dot(self.k_int_old.T, term_chol).T
delta_var_int = cov_int**2 / (self.sigma2_n + var_new)
a = np.sqrt((self.sigma2_n + self.var_int.T - delta_var_int) /
(self.sigma2_n + self.var_int.T + delta_var_int))
# Using the skewnorm's cdf to substitute the Owen's T function.
phi_skew_imp = ss.skewnorm.cdf(self.eps, a, loc=self.mean_int.T,
scale=np.sqrt(self.sigma2_n + self.var_int.T))
w = ((self.phi_int - phi_skew_imp) / 2)
loss_theta_new = 2 * np.sum(self.omegas_int * self.priors_int * w, axis=1)
return loss_theta_new
def __call__(self, xs, phiinv_method='partition'):
# map parameter vector if needed
params = xs if isinstance(xs,dict) else self.pta.map_params(xs)
# phiinvs will be a list or may be a big matrix if spatially
# correlated signals
TNrs = self.pta.get_TNr(params)
TNTs = self.pta.get_TNT(params)
phiinvs = self.pta.get_phiinv(params, logdet=True,
method=phiinv_method)
# get -0.5 * (rNr + logdet_N) piece of likelihood
loglike = -0.5 * np.sum([l for l in self.pta.get_rNr_logdet(params)])
# red noise piece
if self.pta._commonsignals:
phiinv, logdet_phi = phiinvs
Sigma = self._make_sigma(TNTs, phiinv)
TNr = np.concatenate(TNrs)
cf = cholesky(Sigma)
expval = cf(TNr)
logdet_sigma = cf.logdet()
loglike += 0.5*(np.dot(TNr, expval) - logdet_sigma - logdet_phi)
else:
for TNr, TNT, (phiinv, logdet_phi) in zip(TNrs, TNTs, phiinvs):
Sigma = TNT + (np.diag(phiinv) if phiinv.ndim == 1 else phiinv)
try:
cf = sl.cho_factor(Sigma)
expval = sl.cho_solve(cf, TNr)
except:
return -np.inf
logdet_sigma = np.sum(2 * np.log(np.diag(cf[0])))
loglike += 0.5*(np.dot(TNr, expval) -
logdet_sigma - logdet_phi)
return loglike
def get_phiinv_byfreq_cliques(self, params, logdet=False, cholesky=False):
phi = self.get_phi(params, cliques=True)
if isinstance(phi, list):
return [None if phivec is None else phivec.inv(logdet)
for phivec in phi]
else:
ld = 0
# first invert all the cliques
for clcount in range(self._clcount):
idx = (self._cliques == clcount)
if np.any(idx):
idx2 = np.ix_(idx,idx)
if cholesky:
cf = sl.cho_factor(phi[idx2])
if logdet:
ld += 2.0*np.sum(np.log(np.diag(cf[0])))
phi[idx2] = sl.cho_solve(
cf, np.identity(cf[0].shape[0]))
else:
phi2 = phi[idx2]
if logdet:
ld += np.linalg.slogdet(phi2)[1]
phi[idx2] = np.linalg.inv(phi2)
# then do the pure diagonal terms
idx = (self._cliques == -1)
if logdet:
ld += np.sum(np.log(phi[idx,idx]))
phi[idx,idx] = 1.0/phi[idx,idx]
return (phi, ld) if logdet else phi
# we use "cliques" to account for sparse non-diagonal Phi matrices
# for each value in self._cliques, the matrix indices with that value form
# an independent submatrix that can be inverted separately
# reset clique index
def normal_eq_comb(AtA, AtB, PassSet=None):
""" Solve many systems of linear equations using combinatorial grouping.
M. H. Van Benthem and M. R. Keenan, J. Chemometrics 2004; 18: 441-450
Parameters
----------
AtA : numpy.array, shape (n,n)
AtB : numpy.array, shape (n,k)
Returns
-------
(Z,num_cholesky,num_eq)
Z : numpy.array, shape (n,k) - solution
num_cholesky : int - the number of unique cholesky decompositions done
num_eq: int - the number of systems of linear equations solved
"""
num_cholesky = 0
num_eq = 0
if AtB.size == 0:
Z = np.zeros([])
elif (PassSet is None) or np.all(PassSet):
Z = nla.solve(AtA, AtB)
num_cholesky = 1
num_eq = AtB.shape[1]
else:
Z = np.zeros(AtB.shape)
if PassSet.shape[1] == 1:
if np.any(PassSet):
cols = PassSet.nonzero()[0]
Z[cols] = nla.solve(AtA[np.ix_(cols, cols)], AtB[cols])
num_cholesky = 1
num_eq = 1
else:
#
# Both _column_group_loop() and _column_group_recursive() work well.
# Based on preliminary testing,
# _column_group_loop() is slightly faster for tiny k(<10), but
# _column_group_recursive() is faster for large k's.
#
grps = _column_group_recursive(PassSet)
for gr in grps:
cols = PassSet[:, gr[0]].nonzero()[0]
if cols.size > 0:
ix1 = np.ix_(cols, gr)
ix2 = np.ix_(cols, cols)
#
# scipy.linalg.cho_solve can be used instead of numpy.linalg.solve.
# For small n(<200), numpy.linalg.solve appears faster, whereas
# for large n(>500), scipy.linalg.cho_solve appears faster.
# Usage example of scipy.linalg.cho_solve:
# Z[ix1] = sla.cho_solve(sla.cho_factor(AtA[ix2]),AtB[ix1])
#
Z[ix1] = nla.solve(AtA[ix2], AtB[ix1])
num_cholesky += 1
num_eq += len(gr)
num_eq += len(gr)
return Z, num_cholesky, num_eq
def normal_eq_comb(AtA, AtB, PassSet=None):
""" Solve many systems of linear equations using combinatorial grouping.
M. H. Van Benthem and M. R. Keenan, J. Chemometrics 2004; 18: 441-450
Parameters
----------
AtA : numpy.array, shape (n,n)
AtB : numpy.array, shape (n,k)
Returns
-------
(Z,num_cholesky,num_eq)
Z : numpy.array, shape (n,k) - solution
num_cholesky : int - the number of unique cholesky decompositions done
num_eq: int - the number of systems of linear equations solved
"""
num_cholesky = 0
num_eq = 0
if AtB.size == 0:
Z = np.zeros([])
elif (PassSet is None) or np.all(PassSet):
Z = nla.solve(AtA, AtB)
num_cholesky = 1
num_eq = AtB.shape[1]
else:
Z = np.zeros(AtB.shape)
if PassSet.shape[1] == 1:
if np.any(PassSet):
cols = PassSet.nonzero()[0]
Z[cols] = nla.solve(AtA[np.ix_(cols, cols)], AtB[cols])
num_cholesky = 1
num_eq = 1
else:
#
# Both _column_group_loop() and _column_group_recursive() work well.
# Based on preliminary testing,
# _column_group_loop() is slightly faster for tiny k(<10), but
# _column_group_recursive() is faster for large k's.
#
grps = _column_group_recursive(PassSet)
for gr in grps:
cols = PassSet[:, gr[0]].nonzero()[0]
if cols.size > 0:
ix1 = np.ix_(cols, gr)
ix2 = np.ix_(cols, cols)
#
# scipy.linalg.cho_solve can be used instead of numpy.linalg.solve.
# For small n(<200), numpy.linalg.solve appears faster, whereas
# for large n(>500), scipy.linalg.cho_solve appears faster.
# Usage example of scipy.linalg.cho_solve:
# Z[ix1] = sla.cho_solve(sla.cho_factor(AtA[ix2]),AtB[ix1])
#
Z[ix1] = nla.solve(AtA[ix2], AtB[ix1])
num_cholesky += 1
num_eq += len(gr)
num_eq += len(gr)
return Z, num_cholesky, num_eq
def weights_rbf(self, unit_sp, hypers):
d, n = unit_sp.shape
# GP kernel hyper-parameters
alpha, el, jitter = hypers['sig_var'], hypers['lengthscale'], hypers['noise_var']
assert len(el) == d
# pre-allocation for convenience
eye_d, eye_n, eye_y = np.eye(d), np.eye(n), np.eye(n + d * n)
K = self.kern_eq_der(unit_sp, hypers) # evaluate kernel matrix BOTTLENECK
iK = cho_solve(cho_factor(K + jitter * eye_y), eye_y) # invert kernel matrix BOTTLENECK
Lam = np.diag(el ** 2)
iLam = np.diag(el ** -1) # sqrt(Lambda^-1)
iiLam = np.diag(el ** -2) # Lambda^-1
inn = iLam.dot(unit_sp) # (x-m)^T*iLam # (N, D)
B = iiLam + eye_d # P*Lambda^-1+I, (P+Lam)^-1 = Lam^-1*(P*Lam^-1+I)^-1 # (D, D)
cho_B = cho_factor(B)
t = cho_solve(cho_B, inn) # dot(inn, inv(B)) # (x-m)^T*iLam*(P+Lambda)^-1 # (D, N)
l = np.exp(-0.5 * np.sum(inn * t, 0)) # (N, 1)
q = (alpha ** 2 / np.sqrt(det(B))) * l # (N, 1)
Sig_q = cho_solve(cho_B, eye_d) # B^-1*I
eta = Sig_q.dot(unit_sp) # (D,N) Sig_q*x
mu_q = iiLam.dot(eta) # (D,N)
r = q[na, :] * iiLam.dot(mu_q - unit_sp) # -t.dot(iLam) * q # (D, N)
q_tilde = np.hstack((q.T, r.T.ravel())) # (1, N+N*D)
# weights for mean
wm = q_tilde.dot(iK)
# quantities for cross-covariance "weights"
iLamSig = iiLam.dot(Sig_q) # (D,D)
r_tilde = (q[na, na, :] * iLamSig[..., na] + mu_q[na, ...] * r[:, na, :]).T.reshape(n * d, d).T # (D, N*D)
R_tilde = np.hstack((q[na, :] * mu_q, r_tilde)) # (D, N+N*D)
# input-output covariance (cross-covariance) "weights"
Wcc = R_tilde.dot(iK) # (D, N+N*D)
# quantities for covariance weights
zet = 2 * np.log(alpha) - 0.5 * np.sum(inn * inn, 0) # (D,N) 2log(alpha) - 0.5*(x-m)^T*Lambda^-1*(x-m)
inn = iiLam.dot(unit_sp) # inp / el[:, na]**2
R = 2 * iiLam + eye_d # 2P*Lambda^-1 + I
# (N,N)
Q = (1.0 / np.sqrt(det(R))) * np.exp((zet[:, na] + zet[:, na].T) + maha(inn.T, -inn.T, V=0.5 * solve(R, eye_d)))
cho_LamSig = cho_factor(Lam + Sig_q)
Sig_Q = cho_solve(cho_LamSig, Sig_q).dot(iiLam) # (D,D) Lambda^-1 (Lambda*(Lambda+Sig_q)^-1*Sig_q) Lambda^-1
eta_tilde = iiLam.dot(cho_solve(cho_LamSig, eta)) # Lambda^-1(Lambda+Sig_q)^-1*eta
ETA = eta_tilde[..., na] + eta_tilde[:, na, :] # (D,N,N) pairwise sum of pre-multiplied eta's (D,N,N)
# mu_Q = ETA + in_mean[:, na] # (D,N,N)
xnmu = inn[..., na] - ETA # (D,N,N) x_n - mu^Q_nm
# xmmu = sigmas[:, na, :] - mu_Q # x_m - mu^Q_nm
E_dff = (-Q[na, ...] * xnmu).swapaxes(0, 1).reshape(d * n, n)
# (D,D,N,N) (x_n - mu^Q_nm)(x_m - mu^Q_nm)^T + Sig_Q
T = xnmu[:, na, ...] * xnmu.swapaxes(1, 2)[na, ...] + Sig_Q[..., na, na]
E_dffd = (Q[na, na, ...] * T).swapaxes(0, 3).reshape(d * n, -1) # (N*D, N*D)
Q_tilde = np.vstack((np.hstack((Q, E_dff.T)), np.hstack((E_dff, E_dffd)))) # (N+N*D, N+N*D)
# weights for covariance
iKQ = iK.dot(Q_tilde)
Wc = iKQ.dot(iK)
# model variance
self.model_var = np.diag((alpha ** 2 - np.trace(iKQ)) * np.ones((d, 1)))
return wm, Wc, Wcc
def predict(self, pred, full_cov=False, compute_grad=False):
inputs = self.inputs
values = self.values
# Special case if there is no data yet (everything from the prior)
if inputs is None:
return self.predict_from_prior(pred, full_cov, compute_grad)
if pred.shape[1] != self.num_dims:
raise Exception("Dimensionality of inputs must match dimensionality given at init time.")
# The primary covariances for prediction.
cand_cross = self.noiseless_kernel.cross_cov(inputs, pred)
chol, alpha = self._pull_from_cache_or_compute()
# Solve the linear systems.
# Note: if X = LL^T, cho_solve performs X\b whereas solve_triangular performs L\b
beta = spla.solve_triangular(chol, cand_cross, lower=True)
# Predict the marginal means at candidates.
func_m = np.dot(cand_cross.T, alpha) + self.mean.value
if full_cov:
# Return the covariance matrix of the pred inputs,
# rather than just the individual variances at each input
cand_cov = self.noiseless_kernel.cov(pred)
func_v = cand_cov - np.dot(beta.T, beta)
else:
cand_cov = self.noiseless_kernel.diag_cov(pred)
func_v = cand_cov - np.sum(beta**2, axis=0)
if not compute_grad:
return func_m, func_v
grad_cross = self.noiseless_kernel.cross_cov_grad_data(inputs, pred)
grad_xp_m = np.tensordot(np.transpose(grad_cross, (1,2,0)), alpha, 1)
# this should be faster than (and equivalent to) spla.cho_solve((chol, True),cand_cross))
gamma = spla.solve_triangular(chol.T, beta, lower=False)
# Using sum and multiplication and summing instead of matrix multiplication
# because I only want the diagonals of the gradient of the covariance matrix, not the whole thing
grad_xp_v = -2.0*np.sum(gamma[:,:,np.newaxis] * grad_cross, axis=0)
# Not very important -- just to make sure grad_xp_v.shape = grad_xp_m.shape
if values.ndim > 1:
grad_xp_v = grad_xp_v[:,:,np.newaxis]
# In case this is a function over a 1D input,
# return a numpy array rather than a float
if np.ndim(grad_xp_m) == 0:
grad_xp_m = np.array([grad_xp_m])
grad_xp_v = np.array([grad_xp_v])
return func_m, func_v, grad_xp_m, grad_xp_v
def predict(self, pred, full_cov=False, compute_grad=False):
inputs = self.inputs
values = self.values
# Special case if there is no data yet (everything from the prior)
if inputs is None:
return self.predict_from_prior(pred, full_cov, compute_grad)
if pred.shape[1] != self.num_dims:
raise Exception("Dimensionality of inputs must match dimensionality given at init time.")
# The primary covariances for prediction.
cand_cross = self.noiseless_kernel.cross_cov(inputs, pred)
chol, alpha = self._pull_from_cache_or_compute()
# Solve the linear systems.
# Note: if X = LL^T, cho_solve performs X\b whereas solve_triangular performs L\b
beta = spla.solve_triangular(chol, cand_cross, lower=True)
# Predict the marginal means at candidates.
func_m = np.dot(cand_cross.T, alpha) + self.mean.value
if full_cov:
# Return the covariance matrix of the pred inputs,
# rather than just the individual variances at each input
cand_cov = self.noiseless_kernel.cov(pred)
func_v = cand_cov - np.dot(beta.T, beta)
else:
cand_cov = self.noiseless_kernel.diag_cov(pred)
func_v = cand_cov - np.sum(beta**2, axis=0)
if not compute_grad:
return func_m, func_v
grad_cross = self.noiseless_kernel.cross_cov_grad_data(inputs, pred)
grad_xp_m = np.tensordot(np.transpose(grad_cross, (1,2,0)), alpha, 1)
# this should be faster than (and equivalent to) spla.cho_solve((chol, True),cand_cross))
gamma = spla.solve_triangular(chol.T, beta, lower=False)
# Using sum and multiplication and summing instead of matrix multiplication
# because I only want the diagonals of the gradient of the covariance matrix, not the whole thing
grad_xp_v = -2.0*np.sum(gamma[:,:,np.newaxis] * grad_cross, axis=0)
# Not very important -- just to make sure grad_xp_v.shape = grad_xp_m.shape
if values.ndim > 1:
grad_xp_v = grad_xp_v[:,:,np.newaxis]
# In case this is a function over a 1D input,
# return a numpy array rather than a float
if np.ndim(grad_xp_m) == 0:
grad_xp_m = np.array([grad_xp_m])
grad_xp_v = np.array([grad_xp_v])
return func_m, func_v, grad_xp_m, grad_xp_v
def _posterior_mode(self, K, return_temporaries=False):
"""Mode-finding for binary Laplace GPC and fixed kernel.
This approximates the posterior of the latent function values for given
inputs and target observations with a Gaussian approximation and uses
Newton's iteration to find the mode of this approximation.
"""
# Based on Algorithm 3.1 of GPML
# If warm_start are enabled, we reuse the last solution for the
# posterior mode as initialization; otherwise, we initialize with 0
if self.warm_start and hasattr(self, "f_cached") \
and self.f_cached.shape == self.y_train_.shape:
f = self.f_cached
else:
f = np.zeros_like(self.y_train_, dtype=np.float64)
# Use Newton's iteration method to find mode of Laplace approximation
log_marginal_likelihood = -np.inf
for _ in range(self.max_iter_predict):
# Line 4
pi = 1 / (1 + np.exp(-f))
W = pi * (1 - pi)
# Line 5
W_sr = np.sqrt(W)
W_sr_K = W_sr[:, np.newaxis] * K
B = np.eye(W.shape[0]) + W_sr_K * W_sr
L = cholesky(B, lower=True)
# Line 6
b = W * f + (self.y_train_ - pi)
# Line 7
a = b - W_sr * cho_solve((L, True), W_sr_K.dot(b))
# Line 8
f = K.dot(a)
# Line 10: Compute log marginal likelihood in loop and use as
# convergence criterion
lml = -0.5 * a.T.dot(f) \
- np.log(1 + np.exp(-(self.y_train_ * 2 - 1) * f)).sum() \
- np.log(np.diag(L)).sum()
# Check if we have converged (log marginal likelihood does
# not decrease)
# XXX: more complex convergence criterion
if lml - log_marginal_likelihood < 1e-10:
break
log_marginal_likelihood = lml
self.f_cached = f # Remember solution for later warm-starts
if return_temporaries:
return log_marginal_likelihood, (pi, W_sr, L, b, a)
else:
return log_marginal_likelihood