def wrap_constr_jacob_func(constr_jacob):
"""Convenience function to wrap function calculating constraint Jacobian.
Produces a function which returns a dictionary with entry with key
`dc_dpos` corresponding to calculated constraint Jacobian and optionally
also entry with key `gram_chol` for Cholesky decomposition of Gram matrix
if keyword argument `calc_gram_chol` is True.
"""
def constr_jacob_wrapper(pos, calc_gram_chol=True):
jacob = constr_jacob(pos)
cache = {'dc_dpos': jacob}
if calc_gram_chol:
cache['gram_chol'] = la.cho_factor(jacob.dot(jacob.T))
return cache
return constr_jacob_wrapper
python类cho_factor()的实例源码
def project_onto_nullspace(mom, cache):
""" Projects a momentum on to the nullspace of the constraint Jacobian.
Parameters
----------
mom : vector
Momentum state to project.
cache : dictionary
Dictionary of cached constraint Jacobian results.
Returns
-------
mom : vector
Momentum state projected into nullspace of constraint Jacobian.
"""
dc_dpos = cache['dc_dpos']
if 'gram_chol' not in cache:
cache['gram_chol'] = la.cho_factor(dc_dpos.dot(dc_dpos.T))
gram_chol = cache['gram_chol']
dc_dpos_mom = dc_dpos.dot(mom)
gram_inv_dc_dpos_mom = la.cho_solve(gram_chol, dc_dpos_mom)
dc_dpos_pinv_dc_dpos_mom = dc_dpos.T.dot(gram_inv_dc_dpos_mom)
mom -= dc_dpos_pinv_dc_dpos_mom
return mom
def nci(x, m, P):
# dimension of state, # time steps, # MC simulations, # inference algorithms (filters/smoothers)
d, time, mc_sims, algs = m.shape
dx = x[..., na] - m
# Mean Square Error matrix
MSE = np.empty((d, d, time, mc_sims, algs))
for k in range(time):
for s in range(mc_sims):
for alg in range(algs):
MSE[..., k, s, alg] = np.outer(dx[..., k, s, alg], dx[..., k, s, alg])
MSE = MSE.mean(axis=3) # average over MC simulations
# dx_iP_dx = np.empty((1, time, mc_sims, algs))
NCI = np.empty((1, time, mc_sims, algs))
for k in range(1, time):
for s in range(mc_sims):
for alg in range(algs):
# iP_dx = cho_solve(cho_factor(P[:, :, k, s, alg]), dx[:, k, s, alg])
# dx_iP_dx[:, k, s, alg] = dx[:, k, s, alg].dot(iP_dx)
# iMSE_dx = cho_solve(cho_factor(MSE[..., k, fi]), dx[:, k, s, alg])
# NCI[..., k, s, fi] = 10*np.log10(dx_iP_dx[:, k, s, fi]) - 10*np.log10(dx[:, k, s, alg].dot(iMSE_dx))
dx_iP_dx = dx[:, k, s, alg].dot(np.linalg.inv(P[..., k, s, alg])).dot(dx[:, k, s, alg])
dx_iMSE_dx = dx[:, k, s, alg].dot(np.linalg.inv(MSE[..., k, alg])).dot(dx[:, k, s, alg])
NCI[..., k, s, alg] = 10 * np.log10(dx_iP_dx) - 10 * np.log10(dx_iMSE_dx)
return NCI[:, 1:, ...].mean(axis=1) # average over time steps (ignore the 1st)
def check_pd(A, lower=True):
"""
Checks if A is PD.
If so returns True and Cholesky decomposition,
otherwise returns False and None
"""
try:
return True, np.tril(cho_factor(A, lower=lower)[0])
except LinAlgError as err:
if 'not positive definite' in str(err):
return False, None
def kalman_filter(y, H, R, F, Q, mu, PI, z=None):
"""
Given the following sequences (one item of given dimension for
each time step):
- *y*: measurements (M)
- *H*: measurement operator (MxN)
- *R*: measurement noise covariance (MxM)
- *F*: time update operator (NxN)
- *Q*: time update noise covariance (NxN)
- *mu*: initial state (N)
- *PI*: initial state covariance (NxN)
- *z*: (optional) systematic time update input (N)
Return the :class:`FilterResult` containing lists of posterior
state estimates and error covariances.
"""
x_hat = []
P = []
x_hat_prior = mu
P_prior = PI
if z is None:
z = repeat(None)
for i, (y_i, H_i, R_i, F_i, Q_i, z_i) in enumerate(izip(y, H, R, F, Q, z)):
# measurement update
A = cho_factor(NP.matmul(H_i, NP.matmul(P_prior, H_i.T)) + R_i)
B = cho_solve(A, NP.matmul(H_i, P_prior))
b = cho_solve(A, y_i - NP.matmul(H_i, x_hat_prior))
C = NP.matmul(P_prior, H_i.T)
x_hat.append(x_hat_prior + NP.matmul(C, b))
P.append(P_prior - NP.matmul(C, B))
# time update
x_hat_prior = NP.matmul(F_i, x_hat[-1])
if z_i is not None:
x_hat_prior += z_i
P_prior = NP.matmul(F_i, NP.matmul(P[-1], F_i.T)) + Q_i
return FilterResult(x_hat, P)
def solve(self, other):
if other.ndim == 1:
Nx = np.array(other / self.N)
elif other.ndim == 2:
Nx = np.array(other / self.N[:,None])
UNx = np.dot(self.U.T, Nx)
Sigma = np.diag(1/self.J) + np.dot(self.U.T, self.U/self.N[:,None])
cf = sl.cho_factor(Sigma)
if UNx.ndim == 1:
tmp = np.dot(self.U, sl.cho_solve(cf, UNx)) / self.N
else:
tmp = np.dot(self.U, sl.cho_solve(cf, UNx)) / self.N[:,None]
return Nx - tmp
def logdet(self):
Sigma = np.diag(1/self.J) + np.dot(self.U.T, self.U/self.N[:,None])
cf = sl.cho_factor(Sigma)
ld = np.sum(np.log(self.N)) + np.sum(np.log(self.J))
ld += np.sum(2*np.log(np.diag(cf[0])))
return ld
def __init__(self, x):
if sps.issparse(x):
x = x.toarray()
self.cf = sl.cho_factor(x)
def _solve_ZNX(self, X, Z):
"""Solves :math:`Z^T N^{-1}X`, where :math:`X`
and :math:`Z` are 1-d or 2-d arrays.
"""
if X.ndim == 1:
X = X.reshape(X.shape[0], 1)
if Z.ndim == 1:
Z = Z.reshape(Z.shape[0], 1)
n, m = Z.shape[1], X.shape[1]
ZNX = np.zeros((n, m))
if len(self._idx) > 0:
ZNXr = np.dot(Z[self._idx,:].T, X[self._idx,:] /
self._nvec[self._idx, None])
else:
ZNXr = 0
for slc, block in zip(self._slices, self._blocks):
Zblock = Z[slc, :]
Xblock = X[slc, :]
if slc.stop - slc.start > 1:
cf = sl.cho_factor(block+np.diag(self._nvec[slc]))
bx = sl.cho_solve(cf, Xblock)
else:
bx = Xblock / self._nvec[slc][:, None]
ZNX += np.dot(Zblock.T, bx)
ZNX += ZNXr
return ZNX.squeeze() if len(ZNX) > 1 else float(ZNX)
def _solve_NX(self, X):
"""Solves :math:`N^{-1}X`, where :math:`X`
is a 1-d or 2-d array.
"""
if X.ndim == 1:
X = X.reshape(X.shape[0], 1)
NX = X / self._nvec[:,None]
for slc, block in zip(self._slices, self._blocks):
Xblock = X[slc, :]
if slc.stop - slc.start > 1:
cf = sl.cho_factor(block+np.diag(self._nvec[slc]))
NX[slc] = sl.cho_solve(cf, Xblock)
return NX.squeeze()
def _get_logdet(self):
"""Returns log determinant of :math:`N+UJU^{T}` where :math:`U`
is a quantization matrix.
"""
if len(self._idx) > 0:
logdet = np.sum(np.log(self._nvec[self._idx]))
else:
logdet = 0
for slc, block in zip(self._slices, self._blocks):
if slc.stop - slc.start > 1:
cf = sl.cho_factor(block+np.diag(self._nvec[slc]))
logdet += np.sum(2*np.log(np.diag(cf[0])))
else:
logdet += np.sum(np.log(self._nvec[slc]))
return logdet
def __init__(self, Sigma):
self._cholesky = la.cho_factor(Sigma)
def _measurement_update(self, y):
gain = cho_solve(cho_factor(self.z_cov_pred), self.xz_cov).T
self.x_mean_filt = self.x_mean_pred + gain.dot(y - self.z_mean_pred)
self.x_cov_filt = self.x_cov_pred - gain.dot(self.z_cov_pred).dot(gain.T)
def _smoothing_update(self):
gain = cho_solve(cho_factor(self.x_cov_pred), self.xx_cov).T
self.x_mean_smooth = self.x_mean_filt + gain.dot(self.x_mean_smooth - self.x_mean_pred)
self.x_cov_smooth = self.x_cov_filt + gain.dot(self.x_cov_smooth - self.x_cov_pred).dot(gain.T)
def weights_rbf(self, unit_sp, hypers):
# BQ weights for RBF kernel with given hypers, computations adopted from the GP-ADF code [Deisenroth] with
# the following assumptions:
# (A1) the uncertain input is zero-mean with unit covariance
# (A2) one set of hyper-parameters is used for all output dimensions (one GP models all outputs)
d, n = unit_sp.shape
# GP kernel hyper-parameters
alpha, el, jitter = hypers['sig_var'], hypers['lengthscale'], hypers['noise_var']
assert len(el) == d
# pre-allocation for convenience
eye_d, eye_n = np.eye(d), np.eye(n)
iLam1 = np.atleast_2d(np.diag(el ** -1)) # sqrt(Lambda^-1)
iLam2 = np.atleast_2d(np.diag(el ** -2))
inp = unit_sp.T.dot(iLam1) # sigmas / el[:, na] (x - m)^T*sqrt(Lambda^-1) # (numSP, xdim)
K = np.exp(2 * np.log(alpha) - 0.5 * maha(inp, inp))
iK = cho_solve(cho_factor(K + jitter * eye_n), eye_n)
B = iLam2 + eye_d # (D, D)
c = alpha ** 2 / np.sqrt(det(B))
t = inp.dot(inv(B)) # inn*(P + Lambda)^-1
l = np.exp(-0.5 * np.sum(inp * t, 1)) # (N, 1)
zet = 2 * np.log(alpha) - 0.5 * np.sum(inp * inp, 1)
inp = inp.dot(iLam1)
R = 2 * iLam2 + eye_d
t = 1 / np.sqrt(det(R))
L = np.exp((zet[:, na] + zet[:, na].T) + maha(inp, -inp, V=0.5 * inv(R)))
q = c * l # evaluations of the kernel mean map (from the viewpoint of RHKS methods)
# mean weights
wm_f = q.dot(iK)
iKQ = iK.dot(t * L)
# covariance weights
wc_f = iKQ.dot(iK)
# cross-covariance "weights"
wc_fx = np.diag(q).dot(iK)
# used for self.D.dot(x - mean).dot(wc_fx).dot(fx)
self.D = inv(eye_d + np.diag(el ** 2)) # S(S+Lam)^-1; for S=I, (I+Lam)^-1
# model variance; to be added to the covariance
# this diagonal form assumes independent GP outputs (cov(f^a, f^b) = 0 for all a, b: a neq b)
self.model_var = np.diag((alpha ** 2 - np.trace(iKQ)) * np.ones((d, 1)))
return wm_f, wc_f, wc_fx
def plot_gp_model(self, f, unit_sp, args, test_range=(-5, 5, 50), plot_dims=(0, 0)):
# plot out_dim vs. in_dim
in_dim, out_dim = plot_dims
# test input must have the same dimension as specified in kernel
test = np.linspace(*test_range)
test_pts = np.zeros((self.d, len(test)))
test_pts[in_dim, :] = test
# function value observations at training points (unit sigma-points)
y = np.apply_along_axis(f, 0, unit_sp, args)
fx = np.apply_along_axis(f, 0, test_pts, args) # function values at test points
K = self.kern.K(unit_sp.T) # covariances between sigma-points
k = self.kern.K(test_pts.T, unit_sp.T) # covariance between test inputs and sigma-points
kxx = self.kern.Kdiag(test_pts.T) # prior predictive variance
k_iK = cho_solve(cho_factor(K), k.T).T
gp_mean = k_iK.dot(y[out_dim, :]) # GP mean
gp_var = np.diag(np.diag(kxx) - k_iK.dot(k.T)) # GP predictive variance
# plot the GP mean, predictive variance and the true function
plt.figure()
plt.plot(test, fx[out_dim, :], color='r', ls='--', lw=2, label='true')
plt.plot(test, gp_mean, color='b', ls='-', lw=2, label='GP mean')
plt.fill_between(test, gp_mean + 2 * np.sqrt(gp_var), gp_mean - 2 * np.sqrt(gp_var),
color='b', alpha=0.25, label='GP variance')
plt.plot(unit_sp[in_dim, :], y[out_dim, :],
color='k', ls='', marker='o', ms=8, label='data')
plt.legend()
plt.show()
def __call__(self, xs, phiinv_method='partition'):
# map parameter vector if needed
params = xs if isinstance(xs,dict) else self.pta.map_params(xs)
# phiinvs will be a list or may be a big matrix if spatially
# correlated signals
TNrs = self.pta.get_TNr(params)
TNTs = self.pta.get_TNT(params)
phiinvs = self.pta.get_phiinv(params, logdet=True,
method=phiinv_method)
# get -0.5 * (rNr + logdet_N) piece of likelihood
loglike = -0.5 * np.sum([l for l in self.pta.get_rNr_logdet(params)])
# red noise piece
if self.pta._commonsignals:
phiinv, logdet_phi = phiinvs
Sigma = self._make_sigma(TNTs, phiinv)
TNr = np.concatenate(TNrs)
cf = cholesky(Sigma)
expval = cf(TNr)
logdet_sigma = cf.logdet()
loglike += 0.5*(np.dot(TNr, expval) - logdet_sigma - logdet_phi)
else:
for TNr, TNT, (phiinv, logdet_phi) in zip(TNrs, TNTs, phiinvs):
Sigma = TNT + (np.diag(phiinv) if phiinv.ndim == 1 else phiinv)
try:
cf = sl.cho_factor(Sigma)
expval = sl.cho_solve(cf, TNr)
except:
return -np.inf
logdet_sigma = np.sum(2 * np.log(np.diag(cf[0])))
loglike += 0.5*(np.dot(TNr, expval) -
logdet_sigma - logdet_phi)
return loglike
def get_phiinv_byfreq_cliques(self, params, logdet=False, cholesky=False):
phi = self.get_phi(params, cliques=True)
if isinstance(phi, list):
return [None if phivec is None else phivec.inv(logdet)
for phivec in phi]
else:
ld = 0
# first invert all the cliques
for clcount in range(self._clcount):
idx = (self._cliques == clcount)
if np.any(idx):
idx2 = np.ix_(idx,idx)
if cholesky:
cf = sl.cho_factor(phi[idx2])
if logdet:
ld += 2.0*np.sum(np.log(np.diag(cf[0])))
phi[idx2] = sl.cho_solve(
cf, np.identity(cf[0].shape[0]))
else:
phi2 = phi[idx2]
if logdet:
ld += np.linalg.slogdet(phi2)[1]
phi[idx2] = np.linalg.inv(phi2)
# then do the pure diagonal terms
idx = (self._cliques == -1)
if logdet:
ld += np.sum(np.log(phi[idx,idx]))
phi[idx,idx] = 1.0/phi[idx,idx]
return (phi, ld) if logdet else phi
# we use "cliques" to account for sparse non-diagonal Phi matrices
# for each value in self._cliques, the matrix indices with that value form
# an independent submatrix that can be inverted separately
# reset clique index
def weights_rbf(self, unit_sp, hypers):
d, n = unit_sp.shape
# GP kernel hyper-parameters
alpha, el, jitter = hypers['sig_var'], hypers['lengthscale'], hypers['noise_var']
assert len(el) == d
# pre-allocation for convenience
eye_d, eye_n, eye_y = np.eye(d), np.eye(n), np.eye(n + d * n)
K = self.kern_eq_der(unit_sp, hypers) # evaluate kernel matrix BOTTLENECK
iK = cho_solve(cho_factor(K + jitter * eye_y), eye_y) # invert kernel matrix BOTTLENECK
Lam = np.diag(el ** 2)
iLam = np.diag(el ** -1) # sqrt(Lambda^-1)
iiLam = np.diag(el ** -2) # Lambda^-1
inn = iLam.dot(unit_sp) # (x-m)^T*iLam # (N, D)
B = iiLam + eye_d # P*Lambda^-1+I, (P+Lam)^-1 = Lam^-1*(P*Lam^-1+I)^-1 # (D, D)
cho_B = cho_factor(B)
t = cho_solve(cho_B, inn) # dot(inn, inv(B)) # (x-m)^T*iLam*(P+Lambda)^-1 # (D, N)
l = np.exp(-0.5 * np.sum(inn * t, 0)) # (N, 1)
q = (alpha ** 2 / np.sqrt(det(B))) * l # (N, 1)
Sig_q = cho_solve(cho_B, eye_d) # B^-1*I
eta = Sig_q.dot(unit_sp) # (D,N) Sig_q*x
mu_q = iiLam.dot(eta) # (D,N)
r = q[na, :] * iiLam.dot(mu_q - unit_sp) # -t.dot(iLam) * q # (D, N)
q_tilde = np.hstack((q.T, r.T.ravel())) # (1, N+N*D)
# weights for mean
wm = q_tilde.dot(iK)
# quantities for cross-covariance "weights"
iLamSig = iiLam.dot(Sig_q) # (D,D)
r_tilde = (q[na, na, :] * iLamSig[..., na] + mu_q[na, ...] * r[:, na, :]).T.reshape(n * d, d).T # (D, N*D)
R_tilde = np.hstack((q[na, :] * mu_q, r_tilde)) # (D, N+N*D)
# input-output covariance (cross-covariance) "weights"
Wcc = R_tilde.dot(iK) # (D, N+N*D)
# quantities for covariance weights
zet = 2 * np.log(alpha) - 0.5 * np.sum(inn * inn, 0) # (D,N) 2log(alpha) - 0.5*(x-m)^T*Lambda^-1*(x-m)
inn = iiLam.dot(unit_sp) # inp / el[:, na]**2
R = 2 * iiLam + eye_d # 2P*Lambda^-1 + I
# (N,N)
Q = (1.0 / np.sqrt(det(R))) * np.exp((zet[:, na] + zet[:, na].T) + maha(inn.T, -inn.T, V=0.5 * solve(R, eye_d)))
cho_LamSig = cho_factor(Lam + Sig_q)
Sig_Q = cho_solve(cho_LamSig, Sig_q).dot(iiLam) # (D,D) Lambda^-1 (Lambda*(Lambda+Sig_q)^-1*Sig_q) Lambda^-1
eta_tilde = iiLam.dot(cho_solve(cho_LamSig, eta)) # Lambda^-1(Lambda+Sig_q)^-1*eta
ETA = eta_tilde[..., na] + eta_tilde[:, na, :] # (D,N,N) pairwise sum of pre-multiplied eta's (D,N,N)
# mu_Q = ETA + in_mean[:, na] # (D,N,N)
xnmu = inn[..., na] - ETA # (D,N,N) x_n - mu^Q_nm
# xmmu = sigmas[:, na, :] - mu_Q # x_m - mu^Q_nm
E_dff = (-Q[na, ...] * xnmu).swapaxes(0, 1).reshape(d * n, n)
# (D,D,N,N) (x_n - mu^Q_nm)(x_m - mu^Q_nm)^T + Sig_Q
T = xnmu[:, na, ...] * xnmu.swapaxes(1, 2)[na, ...] + Sig_Q[..., na, na]
E_dffd = (Q[na, na, ...] * T).swapaxes(0, 3).reshape(d * n, -1) # (N*D, N*D)
Q_tilde = np.vstack((np.hstack((Q, E_dff.T)), np.hstack((E_dff, E_dffd)))) # (N+N*D, N+N*D)
# weights for covariance
iKQ = iK.dot(Q_tilde)
Wc = iKQ.dot(iK)
# model variance
self.model_var = np.diag((alpha ** 2 - np.trace(iKQ)) * np.ones((d, 1)))
return wm, Wc, Wcc