将RDD划分为长度为n的元组

发布于 2021-01-29 18:19:32

我对Apache Spark和Python比较陌生,想知道像我将要描述的东西是否可行?

我有一个格式为[m 1,m 2,m 3,m 4,m 5,m 6, … m n
]的RDD(运行rdd.collect()时会得到这个)。我想知道是否有可能将此RDD转换为[[m 1,m 2,m 3),(m 4,m 5,m
6).....(m n-2, m n-1,m n)]。内部元组的大小应为k。如果n不能被k整除,则元组之一应具有少于k个元素。

我尝试使用map函数,但无法获得所需的输出。似乎map函数只能返回具有与最初提供的RDD相同数量的元素的RDD。

更新:我尝试使用分区,也能够使其正常工作。

rdd.map(lambda l: (l, l)).partitionBy(int(n/k)).glom().map(lambda ll: [x[0] for x in ll])
关注者
0
被浏览
45
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    Olologin的答案几乎是正确的,但我相信您想要做的是将RDD分为3个元组,而不是将RDD分为3个元组。为此,请尝试以下操作:

    rdd = sc.parallelize(["e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "e10"])
    transformed = rdd.zipWithIndex().groupBy(lambda (_, i): i / 3)
                     .map(lambda (_, list): tuple([elem[0] for elem in list]))
    

    在pyspark中运行时,我得到以下信息:

    >>> from __future__ import print_function    
    >>> rdd = sc.parallelize(["e1", "e2", "e3", "e4", "e5", "e6", "e7", "e8", "e9", "e10"])
    >>> transformed = rdd.zipWithIndex().groupBy(lambda (_, i): i / 3).map(lambda (_, list): tuple([elem[0] for elem in list]))
    >>> transformed.foreach(print)
    ...
    ('e4', 'e5', 'e6')
    ('e10',)
    ('e7', 'e8', 'e9')
    ('e1', 'e2', 'e3')
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看