def forward_step_fn(self, params, inputs):
"""
Forward step over a batch, to be used in tf.scan
:param params:
:param inputs: (batch_size, variable dimensions)
:return:
"""
mu_pred, Sigma_pred, _, _, alpha, u, state, buffer, _, _, _ = params
y = tf.slice(inputs, [0, 0], [-1, self.dim_y]) # (bs, dim_y)
_u = tf.slice(inputs, [0, self.dim_y], [-1, self.dim_u]) # (bs, dim_u)
mask = tf.slice(inputs, [0, self.dim_y + self.dim_u], [-1, 1]) # (bs, dim_u)
# Mixture of C
C = tf.matmul(alpha, tf.reshape(self.C, [-1, self.dim_y*self.dim_z])) # (bs, k) x (k, dim_y*dim_z)
C = tf.reshape(C, [-1, self.dim_y, self.dim_z]) # (bs, dim_y, dim_z)
C.set_shape([Sigma_pred.get_shape()[0], self.dim_y, self.dim_z])
# Residual
y_pred = tf.squeeze(tf.matmul(C, tf.expand_dims(mu_pred, 2))) # (bs, dim_y)
r = y - y_pred # (bs, dim_y)
# project system uncertainty into measurement space
S = tf.matmul(tf.matmul(C, Sigma_pred), C, transpose_b=True) + self.R # (bs, dim_y, dim_y)
S_inv = tf.matrix_inverse(S)
K = tf.matmul(tf.matmul(Sigma_pred, C, transpose_b=True), S_inv) # (bs, dim_z, dim_y)
# For missing values, set to 0 the Kalman gain matrix
K = tf.multiply(tf.expand_dims(mask, 2), K)
# Get current mu and Sigma
mu_t = mu_pred + tf.squeeze(tf.matmul(K, tf.expand_dims(r, 2))) # (bs, dim_z)
I_KC = self._I - tf.matmul(K, C) # (bs, dim_z, dim_z)
Sigma_t = tf.matmul(tf.matmul(I_KC, Sigma_pred), I_KC, transpose_b=True) + self._sast(self.R, K) # (bs, dim_z, dim_z)
# Mixture of A
alpha, state, u, buffer = self.alpha(tf.multiply(mask, y) + tf.multiply((1-mask), y_pred), state, _u, buffer, reuse=True) # (bs, k)
A = tf.matmul(alpha, tf.reshape(self.A, [-1, self.dim_z*self.dim_z])) # (bs, k) x (k, dim_z*dim_z)
A = tf.reshape(A, [-1, self.dim_z, self.dim_z]) # (bs, dim_z, dim_z)
A.set_shape(Sigma_pred.get_shape()) # set shape to batch_size x dim_z x dim_z
# Mixture of B
B = tf.matmul(alpha, tf.reshape(self.B, [-1, self.dim_z*self.dim_u])) # (bs, k) x (k, dim_y*dim_z)
B = tf.reshape(B, [-1, self.dim_z, self.dim_u]) # (bs, dim_y, dim_z)
B.set_shape([A.get_shape()[0], self.dim_z, self.dim_u])
# Prediction
mu_pred = tf.squeeze(tf.matmul(A, tf.expand_dims(mu_t, 2))) + tf.squeeze(tf.matmul(B, tf.expand_dims(u, 2)))
Sigma_pred = tf.scalar_mul(self._alpha_sq, tf.matmul(tf.matmul(A, Sigma_t), A, transpose_b=True) + self.Q)
return mu_pred, Sigma_pred, mu_t, Sigma_t, alpha, u, state, buffer, A, B, C
评论列表
文章目录