filter.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:kvae 作者: simonkamronn 项目源码 文件源码
def forward_step_fn(self, params, inputs):
        """
        Forward step over a batch, to be used in tf.scan
        :param params:
        :param inputs: (batch_size, variable dimensions)
        :return:
        """
        mu_pred, Sigma_pred, _, _, alpha, u, state, buffer, _, _, _ = params
        y = tf.slice(inputs, [0, 0], [-1, self.dim_y])  # (bs, dim_y)
        _u = tf.slice(inputs, [0, self.dim_y], [-1, self.dim_u])  # (bs, dim_u)
        mask = tf.slice(inputs, [0, self.dim_y + self.dim_u], [-1, 1])  # (bs, dim_u)

        # Mixture of C
        C = tf.matmul(alpha, tf.reshape(self.C, [-1, self.dim_y*self.dim_z]))  # (bs, k) x (k, dim_y*dim_z)
        C = tf.reshape(C, [-1, self.dim_y, self.dim_z])  # (bs, dim_y, dim_z)
        C.set_shape([Sigma_pred.get_shape()[0], self.dim_y, self.dim_z])

        # Residual
        y_pred = tf.squeeze(tf.matmul(C, tf.expand_dims(mu_pred, 2)))  # (bs, dim_y)
        r = y - y_pred  # (bs, dim_y)

        # project system uncertainty into measurement space
        S = tf.matmul(tf.matmul(C, Sigma_pred), C, transpose_b=True) + self.R  # (bs, dim_y, dim_y)

        S_inv = tf.matrix_inverse(S)
        K = tf.matmul(tf.matmul(Sigma_pred, C, transpose_b=True), S_inv)  # (bs, dim_z, dim_y)

        # For missing values, set to 0 the Kalman gain matrix
        K = tf.multiply(tf.expand_dims(mask, 2), K)

        # Get current mu and Sigma
        mu_t = mu_pred + tf.squeeze(tf.matmul(K, tf.expand_dims(r, 2)))  # (bs, dim_z)
        I_KC = self._I - tf.matmul(K, C)  # (bs, dim_z, dim_z)
        Sigma_t = tf.matmul(tf.matmul(I_KC, Sigma_pred), I_KC, transpose_b=True) + self._sast(self.R, K)  # (bs, dim_z, dim_z)

        # Mixture of A
        alpha, state, u, buffer = self.alpha(tf.multiply(mask, y) + tf.multiply((1-mask), y_pred), state, _u, buffer, reuse=True)  # (bs, k)
        A = tf.matmul(alpha, tf.reshape(self.A, [-1, self.dim_z*self.dim_z]))  # (bs, k) x (k, dim_z*dim_z)
        A = tf.reshape(A, [-1, self.dim_z, self.dim_z])  # (bs, dim_z, dim_z)
        A.set_shape(Sigma_pred.get_shape())  # set shape to batch_size x dim_z x dim_z

        # Mixture of B
        B = tf.matmul(alpha, tf.reshape(self.B, [-1, self.dim_z*self.dim_u]))  # (bs, k) x (k, dim_y*dim_z)
        B = tf.reshape(B, [-1, self.dim_z, self.dim_u])  # (bs, dim_y, dim_z)
        B.set_shape([A.get_shape()[0], self.dim_z, self.dim_u])

        # Prediction
        mu_pred = tf.squeeze(tf.matmul(A, tf.expand_dims(mu_t, 2))) + tf.squeeze(tf.matmul(B, tf.expand_dims(u, 2)))
        Sigma_pred = tf.scalar_mul(self._alpha_sq, tf.matmul(tf.matmul(A, Sigma_t), A, transpose_b=True) + self.Q)

        return mu_pred, Sigma_pred, mu_t, Sigma_t, alpha, u, state, buffer, A, B, C
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号