def execute(self):
"""Execute RddGroupMapper"""
# get process manager and data store
proc_mgr = ProcessManager()
ds = proc_mgr.service(DataStore)
# fetch data frame from data store
if self.read_key not in ds:
raise KeyError('no input data found in data store with key "{}"'.format(self.read_key))
data = ds[self.read_key]
if not isinstance(data, pyspark.RDD):
raise TypeError('expected a Spark RDD for "{0:s}" (got "{1:s}")'.format(self.read_key, str(type(data))))
# apply input map
if self.input_map:
data = data.map(self.input_map)
# group data by keys in the data
data = data.groupByKey(numPartitions=self.num_group_partitions)
# apply map on group values
if self.flatten_output_groups:
data = data.flatMapValues(self.group_map)
else:
data = data.mapValues(self.group_map)
# apply map on result
if self.result_map:
data = data.map(self.result_map)
# store data in data store
ds[self.store_key] = data
return StatusCode.Success
评论列表
文章目录