rdd_group_mapper.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:Eskapade 作者: KaveIO 项目源码 文件源码
def execute(self):
        """Execute RddGroupMapper"""

        # get process manager and data store
        proc_mgr = ProcessManager()
        ds = proc_mgr.service(DataStore)

        # fetch data frame from data store
        if self.read_key not in ds:
            raise KeyError('no input data found in data store with key "{}"'.format(self.read_key))
        data = ds[self.read_key]
        if not isinstance(data, pyspark.RDD):
            raise TypeError('expected a Spark RDD for "{0:s}" (got "{1:s}")'.format(self.read_key, str(type(data))))

        # apply input map
        if self.input_map:
            data = data.map(self.input_map)

        # group data by keys in the data
        data = data.groupByKey(numPartitions=self.num_group_partitions)

        # apply map on group values
        if self.flatten_output_groups:
            data = data.flatMapValues(self.group_map)
        else:
            data = data.mapValues(self.group_map)

        # apply map on result
        if self.result_map:
            data = data.map(self.result_map)

        # store data in data store
        ds[self.store_key] = data

        return StatusCode.Success
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号