def _mmd2(K_XX, K_XY, K_YY, const_diagonal=False, biased=False):
m = tf.cast(K_XX.get_shape()[0], tf.float32)
n = tf.cast(K_YY.get_shape()[0], tf.float32)
if biased:
mmd2 = (tf.reduce_sum(K_XX) / (m * m)
+ tf.reduce_sum(K_YY) / (n * n)
- 2 * tf.reduce_sum(K_XY) / (m * n))
else:
if const_diagonal is not False:
trace_X = m * const_diagonal
trace_Y = n * const_diagonal
else:
trace_X = tf.trace(K_XX)
trace_Y = tf.trace(K_YY)
mmd2 = ((tf.reduce_sum(K_XX) - trace_X) / (m * (m - 1))
+ (tf.reduce_sum(K_YY) - trace_Y) / (n * (n - 1))
- 2 * tf.reduce_sum(K_XY) / (m * n))
return mmd2
python类trace()的实例源码
def _mmd2(K_XX, K_XY, K_YY, const_diagonal=False, biased=False):
m = tf.cast(K_XX.get_shape()[0], tf.float32)
n = tf.cast(K_YY.get_shape()[0], tf.float32)
# m=50
#n=50
if biased:
mmd2 = (tf.reduce_sum(K_XX) / (m * m)
+ tf.reduce_sum(K_YY) / (n * n)
- 2 * tf.reduce_sum(K_XY) / (m * n))
else:
if const_diagonal is not False:
trace_X = m * const_diagonal
trace_Y = n * const_diagonal
else:
trace_X = tf.trace(K_XX)
trace_Y = tf.trace(K_YY)
mmd2 = ((tf.reduce_sum(K_XX) - trace_X) / (m * (m - 1))
+ (tf.reduce_sum(K_YY) - trace_Y) / (n * (n - 1))
- 2 * tf.reduce_sum(K_XY) / (m * n))
return mmd2
def _build_cross_ent(self, weights, means, covars, kernel_chol):
cross_ent = 0.0
for i in xrange(self.num_components):
sum_val = 0.0
for j in xrange(self.num_latent):
if self.diag_post:
# TODO(karl): this is a bit inefficient since we're not making use of the fact
# that covars is diagonal. A solution most likely involves a custom tf op.
trace = tf.trace(tf.cholesky_solve(kernel_chol[j, :, :],
tf.diag(covars[i, j, :])))
else:
trace = tf.reduce_sum(util.diag_mul(
tf.cholesky_solve(kernel_chol[j, :, :], covars[i, j, :, :]),
tf.transpose(covars[i, j, :, :])))
sum_val += (util.CholNormal(means[i, j, :], kernel_chol[j, :, :]).log_prob(0.0) -
0.5 * trace)
cross_ent += weights[i] * sum_val
return cross_ent
def get_value_updater(self, data, new_mean, gamma_weighted, gamma_sum):
tf_new_differences = tf.subtract(data, tf.expand_dims(new_mean, 0))
tf_sq_dist_matrix = tf.matmul(tf.expand_dims(tf_new_differences, 2), tf.expand_dims(tf_new_differences, 1))
tf_new_covariance = tf.reduce_sum(tf_sq_dist_matrix * tf.expand_dims(tf.expand_dims(gamma_weighted, 1), 2), 0)
if self.has_prior:
tf_new_covariance = self.get_prior_adjustment(tf_new_covariance, gamma_sum)
tf_s, tf_u, _ = tf.svd(tf_new_covariance)
tf_required_eigvals = tf_s[:self.rank]
tf_required_eigvecs = tf_u[:, :self.rank]
tf_new_baseline = (tf.trace(tf_new_covariance) - tf.reduce_sum(tf_required_eigvals)) / self.tf_rest
tf_new_eigvals = tf_required_eigvals - tf_new_baseline
tf_new_eigvecs = tf.transpose(tf_required_eigvecs)
return tf.group(
self.tf_baseline.assign(tf_new_baseline),
self.tf_eigvals.assign(tf_new_eigvals),
self.tf_eigvecs.assign(tf_new_eigvecs)
)
def get_e_A_sym(self, P_var, mu_var, policy_mu_var, policy_sigma_var):
e_A_var1 = self.get_A_sym(P_var, mu_var, policy_mu_var)
e_A_var2 = - 0.5 * tf.reduce_sum(tf.matrix_diag_part(
tf.matmul(P_var, policy_sigma_var)), 1)
#e_A_var2 = - 0.5 * tf.trace(tf.matmul(P_var, policy_sigma_var))
return e_A_var1 + e_A_var2
def trace_sqrt_product(sigma, sigma_v):
"""Find the trace of the positive sqrt of product of covariance matrices.
'_symmetric_matrix_square_root' only works for symmetric matrices, so we
cannot just take _symmetric_matrix_square_root(sigma * sigma_v).
('sigma' and 'sigma_v' are symmetric, but their product is not necessarily).
Let sigma = A A so A = sqrt(sigma), and sigma_v = B B.
We want to find trace(sqrt(sigma sigma_v)) = trace(sqrt(A A B B))
Note the following properties:
(i) forall M1, M2: eigenvalues(M1 M2) = eigenvalues(M2 M1)
=> eigenvalues(A A B B) = eigenvalues (A B B A)
(ii) if M1 = sqrt(M2), then eigenvalues(M1) = sqrt(eigenvalues(M2))
=> eigenvalues(sqrt(sigma sigma_v)) = sqrt(eigenvalues(A B B A))
(iii) forall M: trace(M) = sum(eigenvalues(M))
=> trace(sqrt(sigma sigma_v)) = sum(eigenvalues(sqrt(sigma sigma_v)))
= sum(sqrt(eigenvalues(A B B A)))
= sum(eigenvalues(sqrt(A B B A)))
= trace(sqrt(A B B A))
= trace(sqrt(A sigma_v A))
A = sqrt(sigma). Both sigma and A sigma_v A are symmetric, so we **can**
use the _symmetric_matrix_square_root function to find the roots of these
matrices.
Args:
sigma: a square, symmetric, real, positive semi-definite covariance matrix
sigma_v: same as sigma
Returns:
The trace of the positive square root of sigma*sigma_v
"""
# Note sqrt_sigma is called "A" in the proof above
sqrt_sigma = _symmetric_matrix_square_root(sigma)
# This is sqrt(A sigma_v A) above
sqrt_a_sigmav_a = tf.matmul(
sqrt_sigma, tf.matmul(sigma_v, sqrt_sigma))
return tf.trace(_symmetric_matrix_square_root(sqrt_a_sigmav_a))
def getSparcityPrior(inputX, C_init=None, lambda1=0.01, lambda2=10000, optimizer='Adam', epochs=10000, learning_rate=0.1, print_step=50):
tf.reset_default_graph()
n_feat, n_sample = inputX.shape
X = tf.placeholder(dtype=tf.float32, shape=[n_feat, n_sample], name='X')
if C_init is None:
C = tf.Variable(tf.random_uniform([n_sample, n_sample], -1, 1), name='C')
else:
C = tf.Variable(C_init, name='C')
loss = X - tf.matmul(X, C)
loss = tf.reduce_mean(tf.square(loss))
# Create sparseness in C
reg_lossC = tf.reduce_mean(abs(C)) # L1 loss for C
# Force the entries in the diagonal of C to be zero
reg_lossD = tf.trace(tf.square(C))/n_sample
cost = loss + lambda1 * reg_lossC + lambda2 * reg_lossD
optimizer = optimize(cost, learning_rate, optimizer)
saver = tf.train.Saver()
# Optimizing the function
with tf.Session() as sess:
sess.run(tf.initialize_all_variables())
print("Calculating C ...")
for i in xrange(1, epochs+1):
sess.run(optimizer, feed_dict={X: inputX})
loss = sess.run(cost, feed_dict={X: inputX})
if i % print_step == 0:
print('epoch {0}: global loss = {1}'.format(i, loss))
if i % 50 == 0:
save_path = saver.save(sess, "./model_C_"+str(i)+".ckpt")
print("Model saved in file: %s" % save_path)
C_val = sess.run(C)
return C_val
# Add ops to save and restore all the variables.
# Save the variables to disk.
def test_trace(self):
t = tf.trace(self.random(3, 3))
self.check(t)
def block_method():
g = tf.Graph()
with g.as_default():
matrices = {}
for i in range(0, d):
for j in range(0, d):
with tf.device("/job:worker/task:%d" % ((i*(d-1)+j) % worker_num)):
matrix_name = get_block_name(i, j)
matrices[matrix_name] = tf.random_uniform([M, M], name=matrix_name)
intermediate_traces = {}
for i in range(0, d):
for j in range(0, d):
with tf.device("/job:worker/task:%d" % ((i*(d-1)+j) % worker_num)):
A = matrices[get_block_name(i, j)]
B = matrices[get_block_name(j, i)]
intermediate_traces[get_intermediate_trace_name(i, j)] = tf.trace(tf.matmul(A, B))
with tf.device("/job:worker/task:0"):
retval = tf.add_n(intermediate_traces.values())
config = tf.ConfigProto(log_device_placement=True)
with tf.Session("grpc://vm-22-2:2222", config=config) as sess:
result = sess.run(retval)
sess.close()
print result