def _filter_negative_samples(labels, tensors):
"""keeps only samples with none-negative labels
Params:
-----
labels: of shape (N,)
tensors: a list of tensors, each of shape (N, .., ..) the first axis is sample number
Returns:
-----
tensors: filtered tensors
"""
# return tensors
keeps = tf.where(tf.greater_equal(labels, 0))
keeps = tf.reshape(keeps, [-1])
filtered = []
for t in tensors:
tf.assert_equal(tf.shape(t)[0], tf.shape(labels)[0])
f = tf.gather(t, keeps)
filtered.append(f)
return filtered
python类assert_equal()的实例源码
def image_reading(path: str, resized_size: Tuple[int, int]=None, data_augmentation: bool=False,
padding: bool=False) -> Tuple[tf.Tensor, tf.Tensor]:
# Read image
image_content = tf.read_file(path, name='image_reader')
image = tf.cond(tf.equal(tf.string_split([path], '.').values[1], tf.constant('jpg', dtype=tf.string)),
true_fn=lambda: tf.image.decode_jpeg(image_content, channels=1, try_recover_truncated=True), # TODO channels = 3 ?
false_fn=lambda: tf.image.decode_png(image_content, channels=1), name='image_decoding')
# Data augmentation
if data_augmentation:
image = augment_data(image)
# Padding
if padding:
with tf.name_scope('padding'):
image, img_width = padding_inputs_width(image, resized_size, increment=CONST.DIMENSION_REDUCTION_W_POOLING)
# Resize
else:
image = tf.image.resize_images(image, size=resized_size)
img_width = tf.shape(image)[1]
with tf.control_dependencies([tf.assert_equal(image.shape[:2], resized_size)]):
return image, img_width
def clf_loss_oneclass(pred_logits, gt_labels, cls_num):
"""Compute classification loss for oneclass problem.
Args:
pred_logits: logits prediction from a model.
gt_labels: ground truth class labels.
cls_num: number of classes.
Returns:
computed loss.
"""
with tf.variable_scope("clf_loss"):
tf.assert_equal(tf.reduce_max(gt_labels), tf.convert_to_tensor(cls_num))
onehot_labels = tf.one_hot(gt_labels, cls_num)
clf_loss_elem = tf.losses.softmax_cross_entropy(onehot_labels, pred_logits)
mean_loss = tf.reduce_mean(clf_loss_elem, 0)
return mean_loss
def conv_feat_map_tensor_gram(conv_fmap_tensor):
"""Compute Gram matrix of conv feature maps.
Used in style transfer.
"""
tf.assert_equal(tf.rank(conv_fmap_tensor), 4)
shape = tf.shape(conv_fmap_tensor)
num_images = shape[0]
width = shape[1]
height = shape[2]
num_filters = shape[3]
filters = tf.reshape(conv_fmap_tensor,
tf.stack([num_images, -1, num_filters]))
grams = tf.matmul(
filters, filters,
transpose_a=True) / tf.to_float(width * height * num_filters)
return grams
def _slice(self, X, X2):
"""
Slice the correct dimensions for use in the kernel, as indicated by
`self.active_dims`.
:param X: Input 1 (NxD).
:param X2: Input 2 (MxD), may be None.
:return: Sliced X, X2, (Nxself.input_dim).
"""
if isinstance(self.active_dims, slice):
X = X[:, self.active_dims]
if X2 is not None:
X2 = X2[:, self.active_dims]
else:
X = tf.transpose(tf.gather(tf.transpose(X), self.active_dims))
if X2 is not None:
X2 = tf.transpose(tf.gather(tf.transpose(X2), self.active_dims))
input_dim_shape = tf.shape(X)[1]
input_dim = tf.convert_to_tensor(self.input_dim, dtype=settings.tf_int)
with tf.control_dependencies([tf.assert_equal(input_dim_shape, input_dim)]):
X = tf.identity(X)
return X, X2
def get_mu_tensor(self):
const_fact = self._dist_to_opt_avg**2 * self._h_min**2 / 2 / self._grad_var
coef = tf.Variable([-1.0, 3.0, 0.0, 1.0], dtype=tf.float32, name="cubic_solver_coef")
coef = tf.scatter_update(coef, tf.constant(2), -(3 + const_fact) )
roots = tf.py_func(np.roots, [coef], Tout=tf.complex64, stateful=False)
# filter out the correct root
root_idx = tf.logical_and(tf.logical_and(tf.greater(tf.real(roots), tf.constant(0.0) ),
tf.less(tf.real(roots), tf.constant(1.0) ) ), tf.less(tf.abs(tf.imag(roots) ), 1e-5) )
# in case there are two duplicated roots satisfying the above condition
root = tf.reshape(tf.gather(tf.gather(roots, tf.where(root_idx) ), tf.constant(0) ), shape=[] )
tf.assert_equal(tf.size(root), tf.constant(1) )
dr = self._h_max / self._h_min
mu = tf.maximum(tf.real(root)**2, ( (tf.sqrt(dr) - 1)/(tf.sqrt(dr) + 1) )**2)
return mu
def get_texture_loss_for_layer(x, s, l):
with tf.name_scope('get_style_loss_for_layer'):
# Compute gram matrices using the activated filter maps of the art and generated images
x_layer_maps = getattr(x, l)
t_layer_maps = getattr(s, l)
x_layer_gram = convert_to_gram(x_layer_maps)
t_layer_gram = convert_to_gram(t_layer_maps)
# Make sure the feature map dimensions are the same
assert_equal_shapes = tf.assert_equal(x_layer_maps.get_shape(), t_layer_maps.get_shape())
with tf.control_dependencies([assert_equal_shapes]):
# Compute and return the normalized gram loss using the gram matrices
shape = x_layer_maps.get_shape().as_list()
size = reduce(lambda a, b: a * b, shape) ** 2
gram_loss = get_l2_norm_loss(x_layer_gram - t_layer_gram)
return gram_loss / size
# Compute total variation regularization loss term given a variable image (x) and its shape
def _filter_negative_samples(labels, tensors):
"""keeps only samples with none-negative labels
Params:
-----
labels: of shape (N,)
tensors: a list of tensors, each of shape (N, .., ..) the first axis is sample number
Returns:
-----
tensors: filtered tensors
"""
# return tensors
keeps = tf.where(tf.greater_equal(labels, 0))
keeps = tf.reshape(keeps, [-1])
filtered = []
for t in tensors:
tf.assert_equal(tf.shape(t)[0], tf.shape(labels)[0])
f = tf.gather(t, keeps)
filtered.append(f)
return filtered
def get_style_loss_for_layer(x, s, l):
with tf.name_scope('get_style_loss_for_layer'):
# Compute gram matrices using the activated filter maps of the art and generated images
x_layer_maps = getattr(x, l)
s_layer_maps = getattr(s, l)
x_layer_gram = convert_to_gram(x_layer_maps)
s_layer_gram = convert_to_gram(s_layer_maps)
# Make sure the feature map dimensions are the same
assert_equal_shapes = tf.assert_equal(x_layer_maps.get_shape(), s_layer_maps.get_shape())
with tf.control_dependencies([assert_equal_shapes]):
# Compute and return the normalized gram loss using the gram matrices
shape = x_layer_maps.get_shape().as_list()
size = reduce(lambda a, b: a * b, shape) ** 2
gram_loss = get_l2_norm_loss(x_layer_gram - s_layer_gram)
return gram_loss / size
# Compute total variation regularization loss term given a variable image (x) and its shape
def _assert_correct_number_of_anchors(self, anchors, feature_map_shape_list):
"""Assert that correct number of anchors was generated.
Args:
anchors: box_list.BoxList object holding anchors generated
feature_map_shape_list: list of (height, width) pairs in the format
[(height_0, width_0), (height_1, width_1), ...] that the generated
anchors must align with.
Returns:
Op that raises InvalidArgumentError if the number of anchors does not
match the number of expected anchors.
"""
expected_num_anchors = 0
for num_anchors_per_location, feature_map_shape in zip(
self.num_anchors_per_location(), feature_map_shape_list):
expected_num_anchors += (num_anchors_per_location
* feature_map_shape[0]
* feature_map_shape[1])
return tf.assert_equal(expected_num_anchors, anchors.num_boxes())
def _filter_negative_samples(labels, tensors):
"""keeps only samples with none-negative labels
Params:
-----
labels: of shape (N,)
tensors: a list of tensors, each of shape (N, .., ..) the first axis is sample number
Returns:
-----
tensors: filtered tensors
"""
# return tensors
keeps = tf.where(tf.greater_equal(labels, 0))
keeps = tf.reshape(keeps, [-1])
filtered = []
for t in tensors:
tf.assert_equal(tf.shape(t)[0], tf.shape(labels)[0])
f = tf.gather(t, keeps)
filtered.append(f)
return filtered
def ctc_label_dense_to_sparse(labels, label_lengths, batch_size):
# The second dimension of labels must be equal to the longest label length in the batch
correct_shape_assert = tf.assert_equal(tf.shape(labels)[1], tf.reduce_max(label_lengths))
with tf.control_dependencies([correct_shape_assert]):
labels = tf.identity(labels)
label_shape = tf.shape(labels)
num_batches_tns = tf.stack([label_shape[0]])
max_num_labels_tns = tf.stack([label_shape[1]])
def range_less_than(previous_state, current_input):
return tf.expand_dims(tf.range(label_shape[1]), 0) < current_input
init = tf.cast(tf.fill(max_num_labels_tns, 0), tf.bool)
init = tf.expand_dims(init, 0)
dense_mask = tf.scan(range_less_than, label_lengths, initializer=init, parallel_iterations=1)
dense_mask = dense_mask[:, 0, :]
label_array = tf.reshape(tf.tile(tf.range(0, label_shape[1]), num_batches_tns),
label_shape)
label_ind = tf.boolean_mask(label_array, dense_mask)
batch_array = tf.transpose(tf.reshape(tf.tile(tf.range(0, label_shape[0]), max_num_labels_tns), tf.reverse(label_shape, [0])))
batch_ind = tf.boolean_mask(batch_array, dense_mask)
indices = tf.transpose(tf.reshape(tf.concat([batch_ind, label_ind], 0), [2, -1]))
shape = [batch_size, tf.reduce_max(label_lengths)]
vals_sparse = gather_nd(labels, indices, shape)
return tf.SparseTensor(tf.to_int64(indices), vals_sparse, tf.to_int64(label_shape))
# Validate and normalize transcriptions. Returns a cleaned version of the label
# or None if it's invalid.
def ctc_label_dense_to_sparse(labels, label_lengths, batch_size):
# The second dimension of labels must be equal to the longest label length in the batch
correct_shape_assert = tf.assert_equal(tf.shape(labels)[1], tf.reduce_max(label_lengths))
with tf.control_dependencies([correct_shape_assert]):
labels = tf.identity(labels)
label_shape = tf.shape(labels)
num_batches_tns = tf.stack([label_shape[0]])
max_num_labels_tns = tf.stack([label_shape[1]])
def range_less_than(previous_state, current_input):
return tf.expand_dims(tf.range(label_shape[1]), 0) < current_input
init = tf.cast(tf.fill(max_num_labels_tns, 0), tf.bool)
init = tf.expand_dims(init, 0)
dense_mask = tf.scan(range_less_than, label_lengths, initializer=init, parallel_iterations=1)
dense_mask = dense_mask[:, 0, :]
label_array = tf.reshape(tf.tile(tf.range(0, label_shape[1]), num_batches_tns),
label_shape)
label_ind = tf.boolean_mask(label_array, dense_mask)
batch_array = tf.transpose(tf.reshape(tf.tile(tf.range(0, label_shape[0]), max_num_labels_tns), tf.reverse(label_shape, [0])))
batch_ind = tf.boolean_mask(batch_array, dense_mask)
indices = tf.transpose(tf.reshape(tf.concat([batch_ind, label_ind], 0), [2, -1]))
shape = [batch_size, tf.reduce_max(label_lengths)]
vals_sparse = gather_nd(labels, indices, shape)
return tf.SparseTensor(tf.to_int64(indices), vals_sparse, tf.to_int64(label_shape))
# Validate and normalize transcriptions. Returns a cleaned version of the label
# or None if it's invalid.
def segment_argmax(input, segment_ids):
"""Computes row and col indices Tensors of the segment max in the 2D input."""
with tf.name_scope("segment_argmax"):
num_partitions = tf.reduce_max(segment_ids) + 1
is_max = segment_is_max(input, segment_ids)
# The current is_max could still contain multiple True entries per
# partition. As long as they are in the same row, that is not a problem.
# However, we do need to remove duplicate Trues in the same partition
# in multiple rows.
# For that, we'll multiply is_max with the row indices + 1 and perform
# segment_is_max() again.
rows = tf.shape(input)[0]
cols = tf.shape(input)[1]
row_indices = tf.tile(tf.expand_dims(tf.range(rows), 1), [1, cols])
is_max = segment_is_max(tf.cast(is_max, tf.int32) * (row_indices + 1), segment_ids)
# Get selected rows and columns
row_selected = tf.reduce_any(is_max, axis=1)
row_indices = tf.squeeze(tf.where(row_selected))
rows_selected = tf.reduce_sum(tf.cast(row_selected, tf.int64))
# Assert rows_selected is correct & ensure row_indices is always 1D
with tf.control_dependencies([tf.assert_equal(rows_selected, num_partitions)]):
row_indices = tf.reshape(row_indices, [-1])
selected_rows_is_max = tf.gather(is_max, row_indices)
col_indices = tf.argmax(tf.cast(selected_rows_is_max, tf.int64), axis=1)
# Pack indices
return row_indices, col_indices
train_lstm_multivariate.py 文件源码
项目:TensorFlow-Time-Series-Examples
作者: hzy46
项目源码
文件源码
阅读 38
收藏 0
点赞 0
评论 0
def _filtering_step(self, current_times, current_values, state, predictions):
"""Update model state based on observations.
Note that we don't do much here aside from computing a loss. In this case
it's easier to update the RNN state in _prediction_step, since that covers
running the RNN both on observations (from this method) and our own
predictions. This distinction can be important for probabilistic models,
where repeatedly predicting without filtering should lead to low-confidence
predictions.
Args:
current_times: A [batch size] integer Tensor.
current_values: A [batch size, self.num_features] floating point Tensor
with new observations.
state: The model's state tuple.
predictions: The output of the previous `_prediction_step`.
Returns:
A tuple of new state and a predictions dictionary updated to include a
loss (note that we could also return other measures of goodness of fit,
although only "loss" will be optimized).
"""
state_from_time, prediction, lstm_state = state
with tf.control_dependencies(
[tf.assert_equal(current_times, state_from_time)]):
transformed_values = self._transform(current_values)
# Use mean squared error across features for the loss.
predictions["loss"] = tf.reduce_mean(
(prediction - transformed_values) ** 2, axis=-1)
# Keep track of the new observation in model state. It won't be run
# through the LSTM until the next _imputation_step.
new_state_tuple = (current_times, transformed_values, lstm_state)
return (new_state_tuple, predictions)
def _filtering_step(self, current_times, current_values, state, predictions):
"""Update model state based on observations.
Note that we don't do much here aside from computing a loss. In this case
it's easier to update the RNN state in _prediction_step, since that covers
running the RNN both on observations (from this method) and our own
predictions. This distinction can be important for probabilistic models,
where repeatedly predicting without filtering should lead to low-confidence
predictions.
Args:
current_times: A [batch size] integer Tensor.
current_values: A [batch size, self.num_features] floating point Tensor
with new observations.
state: The model's state tuple.
predictions: The output of the previous `_prediction_step`.
Returns:
A tuple of new state and a predictions dictionary updated to include a
loss (note that we could also return other measures of goodness of fit,
although only "loss" will be optimized).
"""
state_from_time, prediction, lstm_state = state
with tf.control_dependencies(
[tf.assert_equal(current_times, state_from_time)]):
transformed_values = self._transform(current_values)
# Use mean squared error across features for the loss.
predictions["loss"] = tf.reduce_mean(
(prediction - transformed_values) ** 2, axis=-1)
# Keep track of the new observation in model state. It won't be run
# through the LSTM until the next _imputation_step.
new_state_tuple = (current_times, transformed_values, lstm_state)
return (new_state_tuple, predictions)
def clf_loss_multiclass(pred_logits, gt_labels, cls_num):
"""Compute classification loss for multi-class problem.
Args:
pred_logits: logits prediction from a model.
gt_labels: ground truth class labels [batch_size, num_cls] with (0,1) value.
cls_num: number of classes.
Returns:
computed loss.
"""
with tf.variable_scope("clf_loss"):
tf.assert_equal(tf.reduce_max(gt_labels), 1)
clf_loss_elem = tf.losses.sigmoid_cross_entropy(gt_labels, pred_logits)
mean_loss = tf.reduce_mean(clf_loss_elem, 0)
return mean_loss
def clf_loss(pred_logits,
gt_labels,
cls_num,
loss_type=commons.LossType.CLF_SOFTMAX_ONECLASS):
"""Compute classification loss.
Args:
pred_logits: logits prediction from a model.
gt_labels: ground truth class labels.
cls_num: number of classes.
loss_type: specific type for the loss.
SOFT if labels are probability. HARD if class id.
Returns:
computed loss.
"""
with tf.variable_scope("clf_loss"):
if loss_type == commons.LossType.CLF_SOFTMAX_ONECLASS:
# check labels are not exceeding cls_num.
tf.assert_equal(tf.reduce_max(gt_labels), tf.convert_to_tensor(cls_num))
soft_labels = tf.one_hot(gt_labels, cls_num)
else:
soft_labels = gt_labels
clf_loss_elem = tf.nn.softmax_cross_entropy_with_logits(pred_logits,
soft_labels)
mean_loss = tf.reduce_mean(clf_loss_elem, 0)
return mean_loss
def conv_feat_map_tensor_max(conv_fmap_tensor):
"""Compute maximum activation of conv feature maps.
"""
tf.assert_equal(tf.rank(conv_fmap_tensor), 4)
new_conv_tensor = tf.reduce_max(conv_fmap_tensor, axis=3)
return new_conv_tensor
def conv_feat_map_tensor_avg(conv_fmap_tensor):
"""Compute average activation of conv feature maps.
"""
tf.assert_equal(tf.rank(conv_fmap_tensor), 4)
new_conv_tensor = tf.reduce_mean(conv_fmap_tensor, axis=3)
return new_conv_tensor
def vis_filter_activations(output_name, output_tensor):
"""Add filter activation to image summary.
Args:
output_name: name of the output layer.
output_tensor: tensor of the output layer. It should have 4D shape.
"""
# split into chunks of 3.
tf.assert_equal(len(output_tensor.get_shape()), 4)
num_splits = output_tensor.get_shape()[0] / 3
acti_grids = tf.split(0, num_splits, output_tensor,
"{} split".format(output_name))
for split_id in range(num_splits):
summary_name = "{} split {}".format(output_name, split_id)
tf.summary.image(summary_name, acti_grids[split_id], 3)
def check_image(image):
assertion = tf.assert_equal(tf.shape(image)[-1], 3, message="image must have 3 color channels")
with tf.control_dependencies([assertion]):
image = tf.identity(image)
if image.get_shape().ndims not in (3, 4):
raise ValueError("image must be either 3 or 4 dimensions")
# make the last dimension 3 so that you can unstack the colors
shape = list(image.get_shape())
shape[-1] = 3
image.set_shape(shape)
return image
# based on https://github.com/torch/image/blob/9f65c30167b2048ecbe8b7befdc6b2d6d12baee9/generic/image.c
def exKxz_pairwise(self, Z, Xmu, Xcov):
"""
<x_t K_{x_{t-1}, Z}>_q_{x_{t-1:t}}
:param Z: MxD inducing inputs
:param Xmu: X mean (N+1xD)
:param Xcov: 2x(N+1)xDxD
:return: NxMxD
"""
msg_input_shape = "Currently cannot handle slicing in exKxz_pairwise."
assert_input_shape = tf.assert_equal(tf.shape(Xmu)[1], self.input_dim, message=msg_input_shape)
assert_cov_shape = tf.assert_equal(tf.shape(Xmu), tf.shape(Xcov)[1:3], name="assert_Xmu_Xcov_shape")
with tf.control_dependencies([assert_input_shape, assert_cov_shape]):
Xmu = tf.identity(Xmu)
N = tf.shape(Xmu)[0] - 1
D = tf.shape(Xmu)[1]
Xsigmb = tf.slice(Xcov, [0, 0, 0, 0], tf.stack([-1, N, -1, -1]))
Xsigm = Xsigmb[0, :, :, :] # NxDxD
Xsigmc = Xsigmb[1, :, :, :] # NxDxD
Xmum = tf.slice(Xmu, [0, 0], tf.stack([N, -1]))
Xmup = Xmu[1:, :]
lengthscales = self.lengthscales if self.ARD else tf.zeros((D,), dtype=settings.float_type) + self.lengthscales
scalemat = tf.expand_dims(tf.matrix_diag(lengthscales ** 2.0), 0) + Xsigm # NxDxD
det = tf.matrix_determinant(
tf.expand_dims(tf.eye(tf.shape(Xmu)[1], dtype=settings.float_type), 0) +
tf.reshape(lengthscales ** -2.0, (1, 1, -1)) * Xsigm) # N
vec = tf.expand_dims(tf.transpose(Z), 0) - tf.expand_dims(Xmum, 2) # NxDxM
smIvec = tf.matrix_solve(scalemat, vec) # NxDxM
q = tf.reduce_sum(smIvec * vec, [1]) # NxM
addvec = tf.matmul(smIvec, Xsigmc, transpose_a=True) + tf.expand_dims(Xmup, 1) # NxMxD
return self.variance * addvec * tf.reshape(det ** -0.5, (N, 1, 1)) * tf.expand_dims(tf.exp(-0.5 * q), 2)
def exKxz(self, Z, Xmu, Xcov):
"""
It computes the expectation:
<x_t K_{x_t, Z}>_q_{x_t}
:param Z: MxD inducing inputs
:param Xmu: X mean (NxD)
:param Xcov: NxDxD
:return: NxMxD
"""
msg_input_shape = "Currently cannot handle slicing in exKxz."
assert_input_shape = tf.assert_equal(tf.shape(Xmu)[1], self.input_dim, message=msg_input_shape)
assert_cov_shape = tf.assert_equal(tf.shape(Xmu), tf.shape(Xcov)[:2], name="assert_Xmu_Xcov_shape")
with tf.control_dependencies([assert_input_shape, assert_cov_shape]):
Xmu = tf.identity(Xmu)
N = tf.shape(Xmu)[0]
D = tf.shape(Xmu)[1]
lengthscales = self.lengthscales if self.ARD else tf.zeros((D,), dtype=settings.float_type) + self.lengthscales
scalemat = tf.expand_dims(tf.matrix_diag(lengthscales ** 2.0), 0) + Xcov # NxDxD
det = tf.matrix_determinant(
tf.expand_dims(tf.eye(tf.shape(Xmu)[1], dtype=settings.float_type), 0) +
tf.reshape(lengthscales ** -2.0, (1, 1, -1)) * Xcov) # N
vec = tf.expand_dims(tf.transpose(Z), 0) - tf.expand_dims(Xmu, 2) # NxDxM
smIvec = tf.matrix_solve(scalemat, vec) # NxDxM
q = tf.reduce_sum(smIvec * vec, [1]) # NxM
addvec = tf.matmul(smIvec, Xcov, transpose_a=True) + tf.expand_dims(Xmu, 1) # NxMxD
return self.variance * addvec * tf.reshape(det ** -0.5, (N, 1, 1)) * tf.expand_dims(tf.exp(-0.5 * q), 2)
def exKxz_pairwise(self, Z, Xmu, Xcov):
with tf.control_dependencies([
tf.assert_equal(tf.shape(Xmu)[1], tf.constant(self.input_dim, settings.tf_int),
message="Currently cannot handle slicing in exKxz."),
tf.assert_equal(tf.shape(Xmu), tf.shape(Xcov)[1:3], name="assert_Xmu_Xcov_shape")
]):
Xmu = tf.identity(Xmu)
N = tf.shape(Xmu)[0] - 1
Xmum = Xmu[:-1, :]
Xmup = Xmu[1:, :]
op = tf.expand_dims(Xmum, 2) * tf.expand_dims(Xmup, 1) + Xcov[1, :-1, :, :] # NxDxD
return self.variance * tf.matmul(tf.tile(tf.expand_dims(Z, 0), (N, 1, 1)), op)
def exKxz(self, Z, Xmu, Xcov):
with tf.control_dependencies([
tf.assert_equal(tf.shape(Xmu)[1], tf.constant(self.input_dim, settings.int_type),
message="Currently cannot handle slicing in exKxz."),
tf.assert_equal(tf.shape(Xmu), tf.shape(Xcov)[:2], name="assert_Xmu_Xcov_shape")
]):
Xmu = tf.identity(Xmu)
N = tf.shape(Xmu)[0]
op = tf.expand_dims(Xmu, 2) * tf.expand_dims(Xmu, 1) + Xcov # NxDxD
return self.variance * tf.matmul(tf.tile(tf.expand_dims(Z, 0), (N, 1, 1)), op)
def eKdiag(self, Xmu, Xcov):
if not self.on_separate_dimensions:
raise NotImplementedError("Product currently needs to be defined on separate dimensions.") # pragma: no cover
with tf.control_dependencies([
tf.assert_equal(tf.rank(Xcov), 2,
message="Product currently only supports diagonal Xcov.", name="assert_Xcov_diag"),
]):
return reduce(tf.multiply, [k.eKdiag(Xmu, Xcov) for k in self.kern_list])
def eKzxKxz(self, Z, Xmu, Xcov):
if not self.on_separate_dimensions:
raise NotImplementedError("Product currently needs to be defined on separate dimensions.") # pragma: no cover
with tf.control_dependencies([
tf.assert_equal(tf.rank(Xcov), 2,
message="Product currently only supports diagonal Xcov.", name="assert_Xcov_diag"),
]):
return reduce(tf.multiply, [k.eKzxKxz(Z, Xmu, Xcov) for k in self.kern_list])
def exKxz_pairwise(self, Z, Xmu, Xcov):
"""
Computes <x_{t-1} K_{x_t z}>_q(x) for each pair of consecutive X's in
Xmu & Xcov.
:param Z: Fixed inputs (MxD).
:param Xmu: X means (T+1xD).
:param Xcov: 2xT+1xDxD. [0, t, :, :] contains covariances for x_t. [1, t, :, :] contains the cross covariances
for t and t+1.
:return: (TxMxD).
"""
self._check_quadrature()
# Slicing is NOT needed here. The desired behaviour is to *still* return an NxMxD matrix. As even when the
# kernel does not depend on certain inputs, the output matrix will still contain the outer product between the
# mean of x_{t-1} and K_{x_t Z}. The code here will do this correctly automatically, since the quadrature will
# still be done over the distribution x_{t-1, t}, only now the kernel will not depend on certain inputs.
# However, this does mean that at the time of running this function we need to know the input *size* of Xmu, not
# just `input_dim`.
M = tf.shape(Z)[0]
D = self.input_size if hasattr(self, 'input_size') else self.input_dim # Number of actual input dimensions
with tf.control_dependencies([
tf.assert_equal(tf.shape(Xmu)[1], tf.constant(D, dtype=settings.tf_int),
message="Numerical quadrature needs to know correct shape of Xmu.")
]):
Xmu = tf.identity(Xmu)
# First, transform the compact representation of Xmu and Xcov into a
# list of full distributions.
fXmu = tf.concat((Xmu[:-1, :], Xmu[1:, :]), 1) # Nx2D
fXcovt = tf.concat((Xcov[0, :-1, :, :], Xcov[1, :-1, :, :]), 2) # NxDx2D
fXcovb = tf.concat((tf.transpose(Xcov[1, :-1, :, :], (0, 2, 1)), Xcov[0, 1:, :, :]), 2)
fXcov = tf.concat((fXcovt, fXcovb), 1)
return mvnquad(lambda x: tf.expand_dims(self.K(x[:, :D], Z), 2) *
tf.expand_dims(x[:, D:], 1),
fXmu, fXcov, self.num_gauss_hermite_points,
2 * D, Dout=(M, D))
def exKxz(self, Z, Xmu, Xcov):
"""
Computes <x_t K_{x_t z}>_q(x) for the same x_t.
:param Z: Fixed inputs (MxD).
:param Xmu: X means (TxD).
:param Xcov: TxDxD. Contains covariances for each x_t.
:return: (TxMxD).
"""
self._check_quadrature()
# Slicing is NOT needed here. The desired behaviour is to *still* return an NxMxD matrix.
# As even when the kernel does not depend on certain inputs, the output matrix will still
# contain the outer product between the mean of x_t and K_{x_t Z}. The code here will
# do this correctly automatically, since the quadrature will still be done over the
# distribution x_t, only now the kernel will not depend on certain inputs.
# However, this does mean that at the time of running this function we need to know the
# input *size* of Xmu, not just `input_dim`.
M = tf.shape(Z)[0]
# Number of actual input dimensions
D = self.input_size if hasattr(self, 'input_size') else self.input_dim
msg = "Numerical quadrature needs to know correct shape of Xmu."
assert_shape = tf.assert_equal(tf.shape(Xmu)[1], D, message=msg)
with tf.control_dependencies([assert_shape]):
Xmu = tf.identity(Xmu)
def integrand(x):
return tf.expand_dims(self.K(x, Z), 2) * tf.expand_dims(x, 1)
num_points = self.num_gauss_hermite_points
return mvnquad(integrand, Xmu, Xcov, num_points, D, Dout=(M, D))