transform.py 文件源码

python
阅读 38 收藏 0 点赞 0 评论 0

项目:pydatalab 作者: googledatalab 项目源码 文件源码
def process(self, element):
    """Run the transformation graph on batched input data

    Args:
      element: list of csv strings, representing one batch input to the TF graph.

    Returns:
      dict containing the transformed data. Results are un-batched. Sparse
      tensors are converted to lists.
    """
    import apache_beam as beam
    import six
    import tensorflow as tf

    # This function is invoked by a separate sub-process so setting the logging level
    # does not affect Datalab's kernel process.
    tf.logging.set_verbosity(tf.logging.ERROR)
    try:
      clean_element = []
      for line in element:
        clean_element.append(line.rstrip())

      # batch_result is list of numpy arrays with batch_size many rows.
      batch_result = self._session.run(
          fetches=self._transformed_features,
          feed_dict={self._input_placeholder_tensor: clean_element})

      # ex batch_result. 
      # Dense tensor: {'col1': array([[batch_1], [batch_2]])}
      # Sparse tensor: {'col1': tf.SparseTensorValue(
      #   indices=array([[batch_1, 0], [batch_1, 1], ...,
      #                  [batch_2, 0], [batch_2, 1], ...]],
      #   values=array[value, value, value, ...])}

      # Unbatch the results.
      for i in range(len(clean_element)):
        transformed_features = {}
        for name, value in six.iteritems(batch_result):
          if isinstance(value, tf.SparseTensorValue):
            batch_i_indices = value.indices[:, 0] == i
            batch_i_values = value.values[batch_i_indices]
            transformed_features[name] = batch_i_values.tolist()
          else:
            transformed_features[name] = value[i].tolist()

        yield transformed_features

    except Exception as e:  # pylint: disable=broad-except
      yield beam.pvalue.SideOutputValue('errors',
                                        (str(e), element))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号