def process(self, element):
"""Run the transformation graph on batched input data
Args:
element: list of csv strings, representing one batch input to the TF graph.
Returns:
dict containing the transformed data. Results are un-batched. Sparse
tensors are converted to lists.
"""
import apache_beam as beam
import six
import tensorflow as tf
# This function is invoked by a separate sub-process so setting the logging level
# does not affect Datalab's kernel process.
tf.logging.set_verbosity(tf.logging.ERROR)
try:
clean_element = []
for line in element:
clean_element.append(line.rstrip())
# batch_result is list of numpy arrays with batch_size many rows.
batch_result = self._session.run(
fetches=self._transformed_features,
feed_dict={self._input_placeholder_tensor: clean_element})
# ex batch_result.
# Dense tensor: {'col1': array([[batch_1], [batch_2]])}
# Sparse tensor: {'col1': tf.SparseTensorValue(
# indices=array([[batch_1, 0], [batch_1, 1], ...,
# [batch_2, 0], [batch_2, 1], ...]],
# values=array[value, value, value, ...])}
# Unbatch the results.
for i in range(len(clean_element)):
transformed_features = {}
for name, value in six.iteritems(batch_result):
if isinstance(value, tf.SparseTensorValue):
batch_i_indices = value.indices[:, 0] == i
batch_i_values = value.values[batch_i_indices]
transformed_features[name] = batch_i_values.tolist()
else:
transformed_features[name] = value[i].tolist()
yield transformed_features
except Exception as e: # pylint: disable=broad-except
yield beam.pvalue.SideOutputValue('errors',
(str(e), element))
评论列表
文章目录