def _slice_cov(self, cov):
"""
Slice the correct dimensions for use in the kernel, as indicated by
`self.active_dims` for covariance matrices. This requires slicing the
rows *and* columns. This will also turn flattened diagonal
matrices into a tensor of full diagonal matrices.
:param cov: Tensor of covariance matrices (NxDxD or NxD).
:return: N x self.input_dim x self.input_dim.
"""
cov = tf.cond(tf.equal(tf.rank(cov), 2), lambda: tf.matrix_diag(cov), lambda: cov)
if isinstance(self.active_dims, slice):
cov = cov[..., self.active_dims, self.active_dims]
else:
cov_shape = tf.shape(cov)
covr = tf.reshape(cov, [-1, cov_shape[-1], cov_shape[-1]])
gather1 = tf.gather(tf.transpose(covr, [2, 1, 0]), self.active_dims)
gather2 = tf.gather(tf.transpose(gather1, [1, 0, 2]), self.active_dims)
cov = tf.reshape(tf.transpose(gather2, [2, 0, 1]),
tf.concat([cov_shape[:-2], [len(self.active_dims), len(self.active_dims)]], 0))
return cov
python类matrix_diag()的实例源码
def build_backward_variance(self, Yvar):
"""
Additional method for scaling variance backward (used in :class:`.Normalizer`). Can process both the diagonal
variances returned by predict_f, as well as full covariance matrices.
:param Yvar: size N x N x P or size N x P
:return: Yvar scaled, same rank and size as input
"""
rank = tf.rank(Yvar)
# Because TensorFlow evaluates both fn1 and fn2, the transpose can't be in the same line. If a full cov
# matrix is provided fn1 turns it into a rank 4, then tries to transpose it as a rank 3.
# Splitting it in two steps however works fine.
Yvar = tf.cond(tf.equal(rank, 2), lambda: tf.matrix_diag(tf.transpose(Yvar)), lambda: Yvar)
Yvar = tf.cond(tf.equal(rank, 2), lambda: tf.transpose(Yvar, perm=[1, 2, 0]), lambda: Yvar)
N = tf.shape(Yvar)[0]
D = tf.shape(Yvar)[2]
L = tf.cholesky(tf.square(tf.transpose(self.A)))
Yvar = tf.reshape(Yvar, [N * N, D])
scaled_var = tf.reshape(tf.transpose(tf.cholesky_solve(L, tf.transpose(Yvar))), [N, N, D])
return tf.cond(tf.equal(rank, 2), lambda: tf.reduce_sum(scaled_var, axis=1), lambda: scaled_var)
def get_L_sym(self, L_vec_var):
L = tf.reshape(L_vec_var, (-1, self._action_dim, self._action_dim))
return tf.matrix_band_part(L, -1, 0) - \
tf.matrix_diag(tf.matrix_diag_part(L)) + \
tf.matrix_diag(tf.exp(tf.matrix_diag_part(L)))
def get_e_qval_sym(self, obs_var, policy, **kwargs):
if isinstance(policy, StochasticPolicy):
agent_info = policy.dist_info_sym(obs_var)
mu, log_std = agent_info['mean'], agent_info["log_std"]
std = tf.matrix_diag(tf.exp(log_std))
L_var, V_var, mu_var = self.get_output_sym(obs_var, **kwargs)
L_mat_var = self.get_L_sym(L_var)
P_var = self.get_P_sym(L_mat_var)
A_var = self.get_e_A_sym(P_var, mu_var, mu, std)
qvals = A_var + V_var
else:
mu = policy.get_action_sym(obs_var)
qvals = self.get_qval_sym(obs_var, mu, **kwargs)
return qvals
def _build_predict(self, Xnew, full_cov=False):
"""
The posterior variance of F is given by
q(f) = N(f | K alpha + mean, [K^-1 + diag(lambda**2)]^-1)
Here we project this to F*, the values of the GP at Xnew which is given
by
q(F*) = N ( F* | K_{*F} alpha + mean, K_{**} - K_{*f}[K_{ff} +
diag(lambda**-2)]^-1 K_{f*} )
"""
# compute kernel things
Kx = self.kern.K(self.X, Xnew)
K = self.kern.K(self.X)
# predictive mean
f_mean = tf.matmul(Kx, self.q_alpha, transpose_a=True) + self.mean_function(Xnew)
# predictive var
A = K + tf.matrix_diag(tf.transpose(1. / tf.square(self.q_lambda)))
L = tf.cholesky(A)
Kx_tiled = tf.tile(tf.expand_dims(Kx, 0), [self.num_latent, 1, 1])
LiKx = tf.matrix_triangular_solve(L, Kx_tiled)
if full_cov:
f_var = self.kern.K(Xnew) - tf.matmul(LiKx, LiKx, transpose_a=True)
else:
f_var = self.kern.Kdiag(Xnew) - tf.reduce_sum(tf.square(LiKx), 1)
return f_mean, tf.transpose(f_var)
def forward_tensor(self, x):
# create diagonal matrices
return tf.matrix_diag(tf.reshape(x, (-1, self.dim)))
def eKxz(self, Z, Xmu, Xcov):
"""
Also known as phi_1: <K_{x, Z}>_{q(x)}.
:param Z: MxD inducing inputs
:param Xmu: X mean (NxD)
:param Xcov: NxDxD
:return: NxM
"""
# use only active dimensions
Xcov = self._slice_cov(Xcov)
Z, Xmu = self._slice(Z, Xmu)
D = tf.shape(Xmu)[1]
if self.ARD:
lengthscales = self.lengthscales
else:
lengthscales = tf.zeros((D,), dtype=settings.float_type) + self.lengthscales
vec = tf.expand_dims(Xmu, 2) - tf.expand_dims(tf.transpose(Z), 0) # NxDxM
chols = tf.cholesky(tf.expand_dims(tf.matrix_diag(lengthscales ** 2), 0) + Xcov)
Lvec = tf.matrix_triangular_solve(chols, vec)
q = tf.reduce_sum(Lvec ** 2, [1])
chol_diags = tf.matrix_diag_part(chols) # N x D
half_log_dets = tf.reduce_sum(tf.log(chol_diags), 1) - tf.reduce_sum(tf.log(lengthscales)) # N,
return self.variance * tf.exp(-0.5 * q - tf.expand_dims(half_log_dets, 1))
def exKxz_pairwise(self, Z, Xmu, Xcov):
"""
<x_t K_{x_{t-1}, Z}>_q_{x_{t-1:t}}
:param Z: MxD inducing inputs
:param Xmu: X mean (N+1xD)
:param Xcov: 2x(N+1)xDxD
:return: NxMxD
"""
msg_input_shape = "Currently cannot handle slicing in exKxz_pairwise."
assert_input_shape = tf.assert_equal(tf.shape(Xmu)[1], self.input_dim, message=msg_input_shape)
assert_cov_shape = tf.assert_equal(tf.shape(Xmu), tf.shape(Xcov)[1:3], name="assert_Xmu_Xcov_shape")
with tf.control_dependencies([assert_input_shape, assert_cov_shape]):
Xmu = tf.identity(Xmu)
N = tf.shape(Xmu)[0] - 1
D = tf.shape(Xmu)[1]
Xsigmb = tf.slice(Xcov, [0, 0, 0, 0], tf.stack([-1, N, -1, -1]))
Xsigm = Xsigmb[0, :, :, :] # NxDxD
Xsigmc = Xsigmb[1, :, :, :] # NxDxD
Xmum = tf.slice(Xmu, [0, 0], tf.stack([N, -1]))
Xmup = Xmu[1:, :]
lengthscales = self.lengthscales if self.ARD else tf.zeros((D,), dtype=settings.float_type) + self.lengthscales
scalemat = tf.expand_dims(tf.matrix_diag(lengthscales ** 2.0), 0) + Xsigm # NxDxD
det = tf.matrix_determinant(
tf.expand_dims(tf.eye(tf.shape(Xmu)[1], dtype=settings.float_type), 0) +
tf.reshape(lengthscales ** -2.0, (1, 1, -1)) * Xsigm) # N
vec = tf.expand_dims(tf.transpose(Z), 0) - tf.expand_dims(Xmum, 2) # NxDxM
smIvec = tf.matrix_solve(scalemat, vec) # NxDxM
q = tf.reduce_sum(smIvec * vec, [1]) # NxM
addvec = tf.matmul(smIvec, Xsigmc, transpose_a=True) + tf.expand_dims(Xmup, 1) # NxMxD
return self.variance * addvec * tf.reshape(det ** -0.5, (N, 1, 1)) * tf.expand_dims(tf.exp(-0.5 * q), 2)
def exKxz(self, Z, Xmu, Xcov):
"""
It computes the expectation:
<x_t K_{x_t, Z}>_q_{x_t}
:param Z: MxD inducing inputs
:param Xmu: X mean (NxD)
:param Xcov: NxDxD
:return: NxMxD
"""
msg_input_shape = "Currently cannot handle slicing in exKxz."
assert_input_shape = tf.assert_equal(tf.shape(Xmu)[1], self.input_dim, message=msg_input_shape)
assert_cov_shape = tf.assert_equal(tf.shape(Xmu), tf.shape(Xcov)[:2], name="assert_Xmu_Xcov_shape")
with tf.control_dependencies([assert_input_shape, assert_cov_shape]):
Xmu = tf.identity(Xmu)
N = tf.shape(Xmu)[0]
D = tf.shape(Xmu)[1]
lengthscales = self.lengthscales if self.ARD else tf.zeros((D,), dtype=settings.float_type) + self.lengthscales
scalemat = tf.expand_dims(tf.matrix_diag(lengthscales ** 2.0), 0) + Xcov # NxDxD
det = tf.matrix_determinant(
tf.expand_dims(tf.eye(tf.shape(Xmu)[1], dtype=settings.float_type), 0) +
tf.reshape(lengthscales ** -2.0, (1, 1, -1)) * Xcov) # N
vec = tf.expand_dims(tf.transpose(Z), 0) - tf.expand_dims(Xmu, 2) # NxDxM
smIvec = tf.matrix_solve(scalemat, vec) # NxDxM
q = tf.reduce_sum(smIvec * vec, [1]) # NxM
addvec = tf.matmul(smIvec, Xcov, transpose_a=True) + tf.expand_dims(Xmu, 1) # NxMxD
return self.variance * addvec * tf.reshape(det ** -0.5, (N, 1, 1)) * tf.expand_dims(tf.exp(-0.5 * q), 2)
def Linear_RBF_eKxzKzx(self, Ka, Kb, Z, Xmu, Xcov):
Xcov = self._slice_cov(Xcov)
Z, Xmu = self._slice(Z, Xmu)
lin, rbf = (Ka, Kb) if isinstance(Ka, Linear) else (Kb, Ka)
if not isinstance(lin, Linear):
TypeError("{in_lin} is not {linear}".format(in_lin=str(type(lin)), linear=str(Linear)))
if not isinstance(rbf, RBF):
TypeError("{in_rbf} is not {rbf}".format(in_rbf=str(type(rbf)), rbf=str(RBF)))
if lin.ARD or type(lin.active_dims) is not slice or type(rbf.active_dims) is not slice:
raise NotImplementedError("Active dims and/or Linear ARD not implemented. "
"Switching to quadrature.")
D = tf.shape(Xmu)[1]
M = tf.shape(Z)[0]
N = tf.shape(Xmu)[0]
if rbf.ARD:
lengthscales = rbf.lengthscales
else:
lengthscales = tf.zeros((D, ), dtype=settings.float_type) + rbf.lengthscales
lengthscales2 = lengthscales ** 2.0
const = rbf.variance * lin.variance * tf.reduce_prod(lengthscales)
gaussmat = Xcov + tf.matrix_diag(lengthscales2)[None, :, :] # NxDxD
det = tf.matrix_determinant(gaussmat) ** -0.5 # N
cgm = tf.cholesky(gaussmat) # NxDxD
tcgm = tf.tile(cgm[:, None, :, :], [1, M, 1, 1])
vecmin = Z[None, :, :] - Xmu[:, None, :] # NxMxD
d = tf.matrix_triangular_solve(tcgm, vecmin[:, :, :, None]) # NxMxDx1
exp = tf.exp(-0.5 * tf.reduce_sum(d ** 2.0, [2, 3])) # NxM
# exp = tf.Print(exp, [tf.shape(exp)])
vecplus = (Z[None, :, :, None] / lengthscales2[None, None, :, None] +
tf.matrix_solve(Xcov, Xmu[:, :, None])[:, None, :, :]) # NxMxDx1
mean = tf.cholesky_solve(
tcgm, tf.matmul(tf.tile(Xcov[:, None, :, :], [1, M, 1, 1]), vecplus))
mean = mean[:, :, :, 0] * lengthscales2[None, None, :] # NxMxD
a = tf.matmul(tf.tile(Z[None, :, :], [N, 1, 1]),
mean * exp[:, :, None] * det[:, None, None] * const, transpose_b=True)
return a + tf.transpose(a, [0, 2, 1])
def K(self, X, X2=None, presliced=False):
if X2 is None:
d = tf.fill(tf.stack([tf.shape(X)[0]]), tf.squeeze(self.variance))
return tf.matrix_diag(d)
else:
shape = tf.stack([tf.shape(X)[0], tf.shape(X2)[0]])
return tf.zeros(shape, settings.float_type)
def K(self, X, X2=None):
X, X2 = self._slice(X, X2)
X = tf.cast(X[:, 0], tf.int32)
if X2 is None:
X2 = X
else:
X2 = tf.cast(X2[:, 0], tf.int32)
B = tf.matmul(self.W, self.W, transpose_b=True) + tf.matrix_diag(self.kappa)
return tf.gather(tf.transpose(tf.gather(B, X2)), X)
def _build_graph(self, raw_weights, raw_means, raw_covars, raw_inducing_inputs,
train_inputs, train_outputs, num_train, test_inputs):
# First transform all raw variables into their internal form.
# Use softmax(raw_weights) to keep all weights normalized.
weights = tf.exp(raw_weights) / tf.reduce_sum(tf.exp(raw_weights))
if self.diag_post:
# Use exp(raw_covars) so as to guarantee the diagonal matrix remains positive definite.
covars = tf.exp(raw_covars)
else:
# Use vec_to_tri(raw_covars) so as to only optimize over the lower triangular portion.
# We note that we will always operate over the cholesky space internally.
covars_list = [None] * self.num_components
for i in xrange(self.num_components):
mat = util.vec_to_tri(raw_covars[i, :, :])
diag_mat = tf.matrix_diag(tf.matrix_diag_part(mat))
exp_diag_mat = tf.matrix_diag(tf.exp(tf.matrix_diag_part(mat)))
covars_list[i] = mat - diag_mat + exp_diag_mat
covars = tf.stack(covars_list, 0)
# Both inducing inputs and the posterior means can vary freely so don't change them.
means = raw_means
inducing_inputs = raw_inducing_inputs
# Build the matrices of covariances between inducing inputs.
kernel_mat = [self.kernels[i].kernel(inducing_inputs[i, :, :])
for i in xrange(self.num_latent)]
kernel_chol = tf.stack([tf.cholesky(k) for k in kernel_mat], 0)
# Now build the objective function.
entropy = self._build_entropy(weights, means, covars)
cross_ent = self._build_cross_ent(weights, means, covars, kernel_chol)
ell = self._build_ell(weights, means, covars, inducing_inputs,
kernel_chol, train_inputs, train_outputs)
batch_size = tf.to_float(tf.shape(train_inputs)[0])
nelbo = -((batch_size / num_train) * (entropy + cross_ent) + ell)
# Build the leave one out loss function.
loo_loss = self._build_loo_loss(weights, means, covars, inducing_inputs,
kernel_chol, train_inputs, train_outputs)
# Finally, build the prediction function.
predictions = self._build_predict(weights, means, covars, inducing_inputs,
kernel_chol, test_inputs)
return nelbo, loo_loss, predictions