def _log_prob(self, given):
mean, cov_tril = (self.path_param(self.mean),
self.path_param(self.cov_tril))
log_det = 2 * tf.reduce_sum(
tf.log(tf.matrix_diag_part(cov_tril)), axis=-1)
N = tf.cast(self._n_dim, self.dtype)
logZ = - N / 2 * tf.log(2 * tf.constant(np.pi, dtype=self.dtype)) - \
log_det / 2
# logZ.shape == batch_shape
if self._check_numerics:
logZ = tf.check_numerics(logZ, "log[det(Cov)]")
# (given-mean)' Sigma^{-1} (given-mean) =
# (g-m)' L^{-T} L^{-1} (g-m) = |x|^2, where Lx = g-m =: y.
y = tf.expand_dims(given - mean, -1)
L, _ = maybe_explicit_broadcast(
cov_tril, y, 'MultivariateNormalCholesky.cov_tril',
'expand_dims(given, -1)')
x = tf.matrix_triangular_solve(L, y, lower=True)
x = tf.squeeze(x, -1)
stoc_dist = -0.5 * tf.reduce_sum(tf.square(x), axis=-1)
return logZ + stoc_dist
python类matrix_triangular_solve()的实例源码
def test_whiten(self):
"""
make sure that predicting using the whitened representation is the
sameas the non-whitened one.
"""
with self.test_context() as sess:
Xs, X, F, k, num_data, feed_dict = self.prepare()
k.compile(session=sess)
K = k.K(X) + tf.eye(num_data, dtype=settings.float_type) * 1e-6
L = tf.cholesky(K)
V = tf.matrix_triangular_solve(L, F, lower=True)
Fstar_mean, Fstar_var = gpflow.conditionals.conditional(Xs, X, k, F)
Fstar_w_mean, Fstar_w_var = gpflow.conditionals.conditional(Xs, X, k, V, white=True)
mean1, var1 = sess.run([Fstar_w_mean, Fstar_w_var], feed_dict=feed_dict)
mean2, var2 = sess.run([Fstar_mean, Fstar_var], feed_dict=feed_dict)
# TODO: should tolerance be type dependent?
assert_allclose(mean1, mean2)
assert_allclose(var1, var2)
def _build_predict(self, Xnew, full_cov=False):
"""
Xnew is a data matrix, point at which we want to predict
This method computes
p(F* | Y )
where F* are points on the GP at Xnew, Y are noisy observations at X.
"""
Kx = self.kern.K(self.X, Xnew)
K = self.kern.K(self.X) + tf.eye(tf.shape(self.X)[0], dtype=settings.float_type) * self.likelihood.variance
L = tf.cholesky(K)
A = tf.matrix_triangular_solve(L, Kx, lower=True)
V = tf.matrix_triangular_solve(L, self.Y - self.mean_function(self.X))
fmean = tf.matmul(A, V, transpose_a=True) + self.mean_function(Xnew)
if full_cov:
fvar = self.kern.K(Xnew) - tf.matmul(A, A, transpose_a=True)
shape = tf.stack([1, 1, tf.shape(self.Y)[1]])
fvar = tf.tile(tf.expand_dims(fvar, 2), shape)
else:
fvar = self.kern.Kdiag(Xnew) - tf.reduce_sum(tf.square(A), 0)
fvar = tf.tile(tf.reshape(fvar, (-1, 1)), [1, tf.shape(self.Y)[1]])
return fmean, fvar
def _build_common_terms(self):
num_inducing = len(self.feature)
err = self.Y - self.mean_function(self.X) # size N x R
Kdiag = self.kern.Kdiag(self.X)
Kuf = self.feature.Kuf(self.kern, self.X)
Kuu = self.feature.Kuu(self.kern, jitter=settings.numerics.jitter_level)
Luu = tf.cholesky(Kuu) # => Luu Luu^T = Kuu
V = tf.matrix_triangular_solve(Luu, Kuf) # => V^T V = Qff = Kuf^T Kuu^-1 Kuf
diagQff = tf.reduce_sum(tf.square(V), 0)
nu = Kdiag - diagQff + self.likelihood.variance
B = tf.eye(num_inducing, dtype=settings.float_type) + tf.matmul(V / nu, V, transpose_b=True)
L = tf.cholesky(B)
beta = err / tf.expand_dims(nu, 1) # size N x R
alpha = tf.matmul(V, beta) # size N x R
gamma = tf.matrix_triangular_solve(L, alpha, lower=True) # size N x R
return err, nu, Luu, L, alpha, beta, gamma
def _build_predict(self, Xnew, full_cov=False):
"""
Compute the mean and variance of the latent function at some new points
Xnew.
"""
_, _, Luu, L, _, _, gamma = self._build_common_terms()
Kus = self.feature.Kuf(self.kern, Xnew) # size M x Xnew
w = tf.matrix_triangular_solve(Luu, Kus, lower=True) # size M x Xnew
tmp = tf.matrix_triangular_solve(tf.transpose(L), gamma, lower=False)
mean = tf.matmul(w, tmp, transpose_a=True) + self.mean_function(Xnew)
intermediateA = tf.matrix_triangular_solve(L, w, lower=True)
if full_cov:
var = self.kern.K(Xnew) - tf.matmul(w, w, transpose_a=True) \
+ tf.matmul(intermediateA, intermediateA, transpose_a=True)
var = tf.tile(tf.expand_dims(var, 2), tf.stack([1, 1, tf.shape(self.Y)[1]]))
else:
var = self.kern.Kdiag(Xnew) - tf.reduce_sum(tf.square(w), 0) \
+ tf.reduce_sum(tf.square(intermediateA), 0) # size Xnew,
var = tf.tile(tf.expand_dims(var, 1), tf.stack([1, tf.shape(self.Y)[1]]))
return mean, var
def multivariate_normal(x, mu, L):
"""
L is the Cholesky decomposition of the covariance.
x and mu are either vectors (ndim=1) or matrices. In the matrix case, we
assume independence over the *columns*: the number of rows must match the
size of L.
"""
d = x - mu
alpha = tf.matrix_triangular_solve(L, d, lower=True)
num_col = 1 if tf.rank(x) == 1 else tf.shape(x)[1]
num_col = tf.cast(num_col, settings.float_type)
num_dims = tf.cast(tf.shape(x)[0], settings.float_type)
ret = - 0.5 * num_dims * num_col * np.log(2 * np.pi)
ret += - num_col * tf.reduce_sum(tf.log(tf.matrix_diag_part(L)))
ret += - 0.5 * tf.reduce_sum(tf.square(alpha))
return ret
def _define_full_covariance_probs(self, shard_id, shard):
"""Defines the full covariance probabilties per example in a class.
Updates a matrix with dimension num_examples X num_classes.
Args:
shard_id: id of the current shard.
shard: current data shard, 1 X num_examples X dimensions.
"""
diff = shard - self._means
cholesky = tf.cholesky(self._covs + self._min_var)
log_det_covs = 2.0 * tf.reduce_sum(tf.log(tf.matrix_diag_part(cholesky)), 1)
x_mu_cov = tf.square(
tf.matrix_triangular_solve(
cholesky, tf.transpose(
diff, perm=[0, 2, 1]), lower=True))
diag_m = tf.transpose(tf.reduce_sum(x_mu_cov, 1))
self._probs[shard_id] = -0.5 * (
diag_m + tf.to_float(self._dimensions) * tf.log(2 * np.pi) +
log_det_covs)
def _define_full_covariance_probs(self, shard_id, shard):
"""Defines the full covariance probabilties per example in a class.
Updates a matrix with dimension num_examples X num_classes.
Args:
shard_id: id of the current shard.
shard: current data shard, 1 X num_examples X dimensions.
"""
diff = shard - self._means
cholesky = tf.cholesky(self._covs + self._min_var)
log_det_covs = 2.0 * tf.reduce_sum(tf.log(tf.matrix_diag_part(cholesky)), 1)
x_mu_cov = tf.square(
tf.matrix_triangular_solve(
cholesky, tf.transpose(
diff, perm=[0, 2, 1]), lower=True))
diag_m = tf.transpose(tf.reduce_sum(x_mu_cov, 1))
self._probs[shard_id] = -0.5 * (
diag_m + tf.to_float(self._dimensions) * tf.log(2 * np.pi) +
log_det_covs)
def test_whiten(self):
"""
make sure that predicting using the whitened representation is the
sameas the non-whitened one.
"""
with self.test_context() as sess:
rng = np.random.RandomState(0)
Xs, X, F, k, num_data, feed_dict = self.prepare()
k.compile(session=sess)
F_sqrt = tf.placeholder(settings.float_type, [num_data, 1])
F_sqrt_data = rng.rand(num_data, 1)
feed_dict[F_sqrt] = F_sqrt_data
K = k.K(X)
L = tf.cholesky(K)
V = tf.matrix_triangular_solve(L, F, lower=True)
V_chol = tf.matrix_triangular_solve(L, tf.diag(F_sqrt[:, 0]), lower=True)
V_sqrt = tf.expand_dims(V_chol, 2)
Fstar_mean, Fstar_var = gpflow.conditionals.conditional(
Xs, X, k, F, q_sqrt=F_sqrt)
Fstar_w_mean, Fstar_w_var = gpflow.conditionals.conditional(
Xs, X, k, V, q_sqrt=V_sqrt, white=True)
mean_difference = sess.run(Fstar_w_mean - Fstar_mean, feed_dict=feed_dict)
var_difference = sess.run(Fstar_w_var - Fstar_var, feed_dict=feed_dict)
assert_allclose(mean_difference, 0, atol=4)
assert_allclose(var_difference, 0, atol=4)
def _build_likelihood(self):
"""
q_alpha, q_lambda are variational parameters, size N x R
This method computes the variational lower bound on the likelihood,
which is:
E_{q(F)} [ \log p(Y|F) ] - KL[ q(F) || p(F)]
with
q(f) = N(f | K alpha + mean, [K^-1 + diag(square(lambda))]^-1) .
"""
K = self.kern.K(self.X)
K_alpha = tf.matmul(K, self.q_alpha)
f_mean = K_alpha + self.mean_function(self.X)
# compute the variance for each of the outputs
I = tf.tile(tf.expand_dims(tf.eye(self.num_data, dtype=settings.float_type), 0),
[self.num_latent, 1, 1])
A = I + tf.expand_dims(tf.transpose(self.q_lambda), 1) * \
tf.expand_dims(tf.transpose(self.q_lambda), 2) * K
L = tf.cholesky(A)
Li = tf.matrix_triangular_solve(L, I)
tmp = Li / tf.expand_dims(tf.transpose(self.q_lambda), 1)
f_var = 1. / tf.square(self.q_lambda) - tf.transpose(tf.reduce_sum(tf.square(tmp), 1))
# some statistics about A are used in the KL
A_logdet = 2.0 * tf.reduce_sum(tf.log(tf.matrix_diag_part(L)))
trAi = tf.reduce_sum(tf.square(Li))
KL = 0.5 * (A_logdet + trAi - self.num_data * self.num_latent +
tf.reduce_sum(K_alpha * self.q_alpha))
v_exp = self.likelihood.variational_expectations(f_mean, f_var, self.Y)
return tf.reduce_sum(v_exp) - KL
def _build_predict(self, Xnew, full_cov=False):
"""
The posterior variance of F is given by
q(f) = N(f | K alpha + mean, [K^-1 + diag(lambda**2)]^-1)
Here we project this to F*, the values of the GP at Xnew which is given
by
q(F*) = N ( F* | K_{*F} alpha + mean, K_{**} - K_{*f}[K_{ff} +
diag(lambda**-2)]^-1 K_{f*} )
"""
# compute kernel things
Kx = self.kern.K(self.X, Xnew)
K = self.kern.K(self.X)
# predictive mean
f_mean = tf.matmul(Kx, self.q_alpha, transpose_a=True) + self.mean_function(Xnew)
# predictive var
A = K + tf.matrix_diag(tf.transpose(1. / tf.square(self.q_lambda)))
L = tf.cholesky(A)
Kx_tiled = tf.tile(tf.expand_dims(Kx, 0), [self.num_latent, 1, 1])
LiKx = tf.matrix_triangular_solve(L, Kx_tiled)
if full_cov:
f_var = self.kern.K(Xnew) - tf.matmul(LiKx, LiKx, transpose_a=True)
else:
f_var = self.kern.Kdiag(Xnew) - tf.reduce_sum(tf.square(LiKx), 1)
return f_mean, tf.transpose(f_var)
def _build_likelihood(self):
"""
Construct a tensorflow function to compute the bound on the marginal
likelihood. For a derivation of the terms in here, see the associated
SGPR notebook.
"""
num_inducing = len(self.feature)
num_data = tf.cast(tf.shape(self.Y)[0], settings.float_type)
output_dim = tf.cast(tf.shape(self.Y)[1], settings.float_type)
err = self.Y - self.mean_function(self.X)
Kdiag = self.kern.Kdiag(self.X)
Kuf = self.feature.Kuf(self.kern, self.X)
Kuu = self.feature.Kuu(self.kern, jitter=settings.numerics.jitter_level)
L = tf.cholesky(Kuu)
sigma = tf.sqrt(self.likelihood.variance)
# Compute intermediate matrices
A = tf.matrix_triangular_solve(L, Kuf, lower=True) / sigma
AAT = tf.matmul(A, A, transpose_b=True)
B = AAT + tf.eye(num_inducing, dtype=settings.float_type)
LB = tf.cholesky(B)
Aerr = tf.matmul(A, err)
c = tf.matrix_triangular_solve(LB, Aerr, lower=True) / sigma
# compute log marginal bound
bound = -0.5 * num_data * output_dim * np.log(2 * np.pi)
bound += tf.negative(output_dim) * tf.reduce_sum(tf.log(tf.matrix_diag_part(LB)))
bound -= 0.5 * num_data * output_dim * tf.log(self.likelihood.variance)
bound += -0.5 * tf.reduce_sum(tf.square(err)) / self.likelihood.variance
bound += 0.5 * tf.reduce_sum(tf.square(c))
bound += -0.5 * output_dim * tf.reduce_sum(Kdiag) / self.likelihood.variance
bound += 0.5 * output_dim * tf.reduce_sum(tf.matrix_diag_part(AAT))
return bound
def _build_predict(self, Xnew, full_cov=False):
"""
Compute the mean and variance of the latent function at some new points
Xnew. For a derivation of the terms in here, see the associated SGPR
notebook.
"""
num_inducing = len(self.feature)
err = self.Y - self.mean_function(self.X)
Kuf = self.feature.Kuf(self.kern, self.X)
Kuu = self.feature.Kuu(self.kern, jitter=settings.numerics.jitter_level)
Kus = self.feature.Kuf(self.kern, Xnew)
sigma = tf.sqrt(self.likelihood.variance)
L = tf.cholesky(Kuu)
A = tf.matrix_triangular_solve(L, Kuf, lower=True) / sigma
B = tf.matmul(A, A, transpose_b=True) + tf.eye(num_inducing, dtype=settings.float_type)
LB = tf.cholesky(B)
Aerr = tf.matmul(A, err)
c = tf.matrix_triangular_solve(LB, Aerr, lower=True) / sigma
tmp1 = tf.matrix_triangular_solve(L, Kus, lower=True)
tmp2 = tf.matrix_triangular_solve(LB, tmp1, lower=True)
mean = tf.matmul(tmp2, c, transpose_a=True)
if full_cov:
var = self.kern.K(Xnew) + tf.matmul(tmp2, tmp2, transpose_a=True) \
- tf.matmul(tmp1, tmp1, transpose_a=True)
shape = tf.stack([1, 1, tf.shape(self.Y)[1]])
var = tf.tile(tf.expand_dims(var, 2), shape)
else:
var = self.kern.Kdiag(Xnew) + tf.reduce_sum(tf.square(tmp2), 0) \
- tf.reduce_sum(tf.square(tmp1), 0)
shape = tf.stack([1, tf.shape(self.Y)[1]])
var = tf.tile(tf.expand_dims(var, 1), shape)
return mean + self.mean_function(Xnew), var
def eKxz(self, Z, Xmu, Xcov):
"""
Also known as phi_1: <K_{x, Z}>_{q(x)}.
:param Z: MxD inducing inputs
:param Xmu: X mean (NxD)
:param Xcov: NxDxD
:return: NxM
"""
# use only active dimensions
Xcov = self._slice_cov(Xcov)
Z, Xmu = self._slice(Z, Xmu)
D = tf.shape(Xmu)[1]
if self.ARD:
lengthscales = self.lengthscales
else:
lengthscales = tf.zeros((D,), dtype=settings.float_type) + self.lengthscales
vec = tf.expand_dims(Xmu, 2) - tf.expand_dims(tf.transpose(Z), 0) # NxDxM
chols = tf.cholesky(tf.expand_dims(tf.matrix_diag(lengthscales ** 2), 0) + Xcov)
Lvec = tf.matrix_triangular_solve(chols, vec)
q = tf.reduce_sum(Lvec ** 2, [1])
chol_diags = tf.matrix_diag_part(chols) # N x D
half_log_dets = tf.reduce_sum(tf.log(chol_diags), 1) - tf.reduce_sum(tf.log(lengthscales)) # N,
return self.variance * tf.exp(-0.5 * q - tf.expand_dims(half_log_dets, 1))
def eKzxKxz(self, Z, Xmu, Xcov):
"""
Also known as Phi_2.
:param Z: MxD
:param Xmu: X mean (NxD)
:param Xcov: X covariance matrices (NxDxD)
:return: NxMxM
"""
# use only active dimensions
Xcov = self._slice_cov(Xcov)
Z, Xmu = self._slice(Z, Xmu)
M = tf.shape(Z)[0]
N = tf.shape(Xmu)[0]
D = tf.shape(Xmu)[1]
lengthscales = self.lengthscales if self.ARD else tf.zeros((D,), dtype=settings.float_type) + self.lengthscales
Kmms = tf.sqrt(self.K(Z, presliced=True)) / self.variance ** 0.5
scalemat = tf.expand_dims(tf.eye(D, dtype=settings.float_type), 0) + 2 * Xcov * tf.reshape(lengthscales ** -2.0, [1, 1, -1]) # NxDxD
det = tf.matrix_determinant(scalemat)
mat = Xcov + 0.5 * tf.expand_dims(tf.matrix_diag(lengthscales ** 2.0), 0) # NxDxD
cm = tf.cholesky(mat) # NxDxD
vec = 0.5 * (tf.reshape(tf.transpose(Z), [1, D, 1, M]) +
tf.reshape(tf.transpose(Z), [1, D, M, 1])) - tf.reshape(Xmu, [N, D, 1, 1]) # NxDxMxM
svec = tf.reshape(vec, (N, D, M * M))
ssmI_z = tf.matrix_triangular_solve(cm, svec) # NxDx(M*M)
smI_z = tf.reshape(ssmI_z, (N, D, M, M)) # NxDxMxM
fs = tf.reduce_sum(tf.square(smI_z), [1]) # NxMxM
return self.variance ** 2.0 * tf.expand_dims(Kmms, 0) * tf.exp(-0.5 * fs) * tf.reshape(det ** -0.5, [N, 1, 1])
def Linear_RBF_eKxzKzx(self, Ka, Kb, Z, Xmu, Xcov):
Xcov = self._slice_cov(Xcov)
Z, Xmu = self._slice(Z, Xmu)
lin, rbf = (Ka, Kb) if isinstance(Ka, Linear) else (Kb, Ka)
if not isinstance(lin, Linear):
TypeError("{in_lin} is not {linear}".format(in_lin=str(type(lin)), linear=str(Linear)))
if not isinstance(rbf, RBF):
TypeError("{in_rbf} is not {rbf}".format(in_rbf=str(type(rbf)), rbf=str(RBF)))
if lin.ARD or type(lin.active_dims) is not slice or type(rbf.active_dims) is not slice:
raise NotImplementedError("Active dims and/or Linear ARD not implemented. "
"Switching to quadrature.")
D = tf.shape(Xmu)[1]
M = tf.shape(Z)[0]
N = tf.shape(Xmu)[0]
if rbf.ARD:
lengthscales = rbf.lengthscales
else:
lengthscales = tf.zeros((D, ), dtype=settings.float_type) + rbf.lengthscales
lengthscales2 = lengthscales ** 2.0
const = rbf.variance * lin.variance * tf.reduce_prod(lengthscales)
gaussmat = Xcov + tf.matrix_diag(lengthscales2)[None, :, :] # NxDxD
det = tf.matrix_determinant(gaussmat) ** -0.5 # N
cgm = tf.cholesky(gaussmat) # NxDxD
tcgm = tf.tile(cgm[:, None, :, :], [1, M, 1, 1])
vecmin = Z[None, :, :] - Xmu[:, None, :] # NxMxD
d = tf.matrix_triangular_solve(tcgm, vecmin[:, :, :, None]) # NxMxDx1
exp = tf.exp(-0.5 * tf.reduce_sum(d ** 2.0, [2, 3])) # NxM
# exp = tf.Print(exp, [tf.shape(exp)])
vecplus = (Z[None, :, :, None] / lengthscales2[None, None, :, None] +
tf.matrix_solve(Xcov, Xmu[:, :, None])[:, None, :, :]) # NxMxDx1
mean = tf.cholesky_solve(
tcgm, tf.matmul(tf.tile(Xcov[:, None, :, :], [1, M, 1, 1]), vecplus))
mean = mean[:, :, :, 0] * lengthscales2[None, None, :] # NxMxD
a = tf.matmul(tf.tile(Z[None, :, :], [N, 1, 1]),
mean * exp[:, :, None] * det[:, None, None] * const, transpose_b=True)
return a + tf.transpose(a, [0, 2, 1])
def base_conditional(Kmn, Kmm, Knn, f, *, full_cov=False, q_sqrt=None, white=False):
# compute kernel stuff
num_func = tf.shape(f)[1] # K
Lm = tf.cholesky(Kmm)
# Compute the projection matrix A
A = tf.matrix_triangular_solve(Lm, Kmn, lower=True)
# compute the covariance due to the conditioning
if full_cov:
fvar = Knn - tf.matmul(A, A, transpose_a=True)
shape = tf.stack([num_func, 1, 1])
else:
fvar = Knn - tf.reduce_sum(tf.square(A), 0)
shape = tf.stack([num_func, 1])
fvar = tf.tile(tf.expand_dims(fvar, 0), shape) # K x N x N or K x N
# another backsubstitution in the unwhitened case
if not white:
A = tf.matrix_triangular_solve(tf.transpose(Lm), A, lower=False)
# construct the conditional mean
fmean = tf.matmul(A, f, transpose_a=True)
if q_sqrt is not None:
if q_sqrt.get_shape().ndims == 2:
LTA = A * tf.expand_dims(tf.transpose(q_sqrt), 2) # K x M x N
elif q_sqrt.get_shape().ndims == 3:
L = tf.matrix_band_part(tf.transpose(q_sqrt, (2, 0, 1)), -1, 0) # K x M x M
A_tiled = tf.tile(tf.expand_dims(A, 0), tf.stack([num_func, 1, 1]))
LTA = tf.matmul(L, A_tiled, transpose_a=True) # K x M x N
else: # pragma: no cover
raise ValueError("Bad dimension for q_sqrt: %s" %
str(q_sqrt.get_shape().ndims))
if full_cov:
fvar = fvar + tf.matmul(LTA, LTA, transpose_a=True) # K x N x N
else:
fvar = fvar + tf.reduce_sum(tf.square(LTA), 1) # K x N
fvar = tf.transpose(fvar) # N x K or N x N x K
return fmean, fvar
def test_MatrixTriangularSolve(self):
t = tf.matrix_triangular_solve(*self.random((2, 3, 3, 3), (2, 3, 3, 1)), adjoint=False, lower=False)
self.check(t)
t = tf.matrix_triangular_solve(*self.random((2, 3, 3, 3), (2, 3, 3, 1)), adjoint=True, lower=False)
self.check(t)
t = tf.matrix_triangular_solve(*self.random((2, 3, 3, 3), (2, 3, 3, 1)), adjoint=False, lower=True)
self.check(t)
def _build_likelihood(self):
"""
Construct a tensorflow function to compute the bound on the marginal
likelihood.
"""
num_inducing = tf.shape(self.Z)[0]
psi0 = tf.reduce_sum(self.kern.eKdiag(self.X_mean, self.X_var), 0)
psi1 = self.kern.eKxz(self.Z, self.X_mean, self.X_var)
psi2 = tf.reduce_sum(self.kern.eKzxKxz(self.Z, self.X_mean, self.X_var), 0)
Kuu = self.kern.K(self.Z) + tf.eye(num_inducing, dtype=settings.float_type) * settings.numerics.jitter_level
L = tf.cholesky(Kuu)
sigma2 = self.likelihood.variance
sigma = tf.sqrt(sigma2)
# Compute intermediate matrices
A = tf.matrix_triangular_solve(L, tf.transpose(psi1), lower=True) / sigma
tmp = tf.matrix_triangular_solve(L, psi2, lower=True)
AAT = tf.matrix_triangular_solve(L, tf.transpose(tmp), lower=True) / sigma2
B = AAT + tf.eye(num_inducing, dtype=settings.float_type)
LB = tf.cholesky(B)
log_det_B = 2. * tf.reduce_sum(tf.log(tf.matrix_diag_part(LB)))
c = tf.matrix_triangular_solve(LB, tf.matmul(A, self.Y), lower=True) / sigma
# KL[q(x) || p(x)]
dX_var = self.X_var if len(self.X_var.get_shape()) == 2 else tf.matrix_diag_part(self.X_var)
NQ = tf.cast(tf.size(self.X_mean), settings.float_type)
D = tf.cast(tf.shape(self.Y)[1], settings.float_type)
KL = -0.5 * tf.reduce_sum(tf.log(dX_var)) \
+ 0.5 * tf.reduce_sum(tf.log(self.X_prior_var)) \
- 0.5 * NQ \
+ 0.5 * tf.reduce_sum((tf.square(self.X_mean - self.X_prior_mean) + dX_var) / self.X_prior_var)
# compute log marginal bound
ND = tf.cast(tf.size(self.Y), settings.float_type)
bound = -0.5 * ND * tf.log(2 * np.pi * sigma2)
bound += -0.5 * D * log_det_B
bound += -0.5 * tf.reduce_sum(tf.square(self.Y)) / sigma2
bound += 0.5 * tf.reduce_sum(tf.square(c))
bound += -0.5 * D * (tf.reduce_sum(psi0) / sigma2 -
tf.reduce_sum(tf.matrix_diag_part(AAT)))
bound -= KL
return bound
def likelihood(self, hyp, X_batch, y_batch, monitor=False):
M = self.M
Z = self.Z
m = self.m
S = self.S
jitter = self.jitter
jitter_cov = self.jitter_cov
N = tf.shape(X_batch)[0]
logsigma_n = hyp[-1]
sigma_n = tf.exp(logsigma_n)
# Compute K_u_inv
K_u = kernel_tf(Z, Z, hyp[:-1])
L = tf.cholesky(K_u + np.eye(M)*jitter_cov)
K_u_inv = tf.matrix_triangular_solve(tf.transpose(L), tf.matrix_triangular_solve(L, np.eye(M), lower=True), lower=False)
K_u_inv_op = self.K_u_inv.assign(K_u_inv)
# Compute mu
psi = kernel_tf(Z, X_batch, hyp[:-1])
K_u_inv_m = tf.matmul(K_u_inv, m)
MU = tf.matmul(tf.transpose(psi), K_u_inv_m)
# Compute cov
Alpha = tf.matmul(K_u_inv, psi)
COV = kernel_tf(X_batch, X_batch, hyp[:-1]) - tf.matmul(tf.transpose(psi), tf.matmul(K_u_inv,psi)) + \
tf.matmul(tf.transpose(Alpha), tf.matmul(S,Alpha))
# Compute COV_inv
LL = tf.cholesky(COV + tf.eye(N, dtype=tf.float64)*sigma_n + tf.eye(N, dtype=tf.float64)*jitter)
COV_inv = tf.matrix_triangular_solve(tf.transpose(LL), tf.matrix_triangular_solve(LL, tf.eye(N, dtype=tf.float64), lower=True), lower=False)
# Compute cov(Z, X)
cov_ZX = tf.matmul(S,Alpha)
# Update m and S
alpha = tf.matmul(COV_inv, tf.transpose(cov_ZX))
m_new = m + tf.matmul(cov_ZX, tf.matmul(COV_inv, y_batch-MU))
S_new = S - tf.matmul(cov_ZX, alpha)
if monitor == False:
m_op = self.m.assign(m_new)
S_op = self.S.assign(S_new)
# Compute NLML
K_u_inv_m = tf.matmul(K_u_inv, m_new)
NLML = 0.5*tf.matmul(tf.transpose(m_new), K_u_inv_m) + tf.reduce_sum(tf.log(tf.diag_part(L))) + 0.5*np.log(2.*np.pi)*tf.cast(M, tf.float64)
train = self.optimizer.minimize(NLML)
nlml_op = self.nlml.assign(NLML[0,0])
return tf.group(*[train, m_op, S_op, nlml_op, K_u_inv_op])