def transform(self, dstreams, transformFunc):
"""
Create a new DStream in which each RDD is generated by applying
a function on RDDs of the DStreams. The order of the JavaRDDs in
the transform function parameter will be the same as the order
of corresponding DStreams in the list.
"""
jdstreams = [d._jdstream for d in dstreams]
# change the final serializer to sc.serializer
func = TransformFunction(self._sc,
lambda t, *rdds: transformFunc(rdds).map(lambda x: x),
*[d._jrdd_deserializer for d in dstreams])
jfunc = self._jvm.TransformFunction(func)
jdstream = self._jssc.transform(jdstreams, jfunc)
return DStream(jdstream, self, self._sc.serializer)
评论列表
文章目录