如何在分区之间平衡我的数据?

发布于 2021-01-29 15:22:17

编辑 :答案有帮助,但是我在以下文章中描述了我的解决方案:Spark中的memoryOverhead问题


我有一个带有202092分区的RDD,该分区读取其他人创建的数据集。我可以手动看到分区之间的数据不平衡,例如,其中一些分区具有0张图像,另一些分区具有4k,均值位于432。处理数据时,出现此错误:

Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

而memoryOverhead已经提高。我感觉到一些峰值正在发生,这些峰值使Yarn杀死了我的容器,因为该峰值溢出了指定的边界。

那么,我该怎么做才能确保我的数据在 各个分区 之间达到 (大致) 平衡?


我的想法是repartition()可以工作,它会调用改组:

dataset = dataset.repartition(202092)

尽管编程指南有说明,但我还是遇到了同样的错误:

重新分区(numPartitions)

在RDD中随机重排数据以创建更多或更少的分区, 并在整个 分区之间 保持平衡 。这始终会拖曳网络上的所有数据。


不过请查看我的玩具示例:

data = sc.parallelize([0,1,2], 3).mapPartitions(lambda x: range((x.next() + 1) * 1000))
d = data.glom().collect()
len(d[0])     # 1000
len(d[1])     # 2000
len(d[2])     # 3000
repartitioned_data = data.repartition(3)
re_d = repartitioned_data.glom().collect()
len(re_d[0])  # 1854
len(re_d[1])  # 1754
len(re_d[2])  # 2392
repartitioned_data = data.repartition(6)
re_d = repartitioned_data.glom().collect()
len(re_d[0])  # 422
len(re_d[1])  # 845
len(re_d[2])  # 1643
len(re_d[3])  # 1332
len(re_d[4])  # 1547
len(re_d[5])  # 211
repartitioned_data = data.repartition(12)
re_d = repartitioned_data.glom().collect()
len(re_d[0])  # 132
len(re_d[1])  # 265
len(re_d[2])  # 530
len(re_d[3])  # 1060
len(re_d[4])  # 1025
len(re_d[5])  # 145
len(re_d[6])  # 290
len(re_d[7])  # 580
len(re_d[8])  # 1113
len(re_d[9])  # 272
len(re_d[10]) # 522
len(re_d[11]) # 66
关注者
0
被浏览
52
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    我认为内存开销限制超出了问题,这是由于在提取过程中使用了DirectMemory缓冲区。我认为它已在2.0.0中修复。(我们遇到了同样的问题,但是当发现升级到2.0.0可以解决问题时,我们就不再进行深入研究了。不幸的是,我没有Spark问题编号来支持我。)


    之后的不均匀分隔repartition令人惊讶。与https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L443对比。Spark甚至会在中生成随机密钥repartition,因此不会使用可能有偏差的哈希来完成。

    我尝试了您的示例,并使用Spark 1.6.2和Spark 2.0.0获得了 完全相同的 结果。但不是来自Scala spark-shell

    scala> val data = sc.parallelize(1 to 3, 3).mapPartitions { it => (1 to it.next * 1000).iterator }
    data: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:24
    
    scala> data.mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
    res1: Seq[Int] = WrappedArray(1000, 2000, 3000)
    
    scala> data.repartition(3).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
    res2: Seq[Int] = WrappedArray(1999, 2001, 2000)
    
    scala> data.repartition(6).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
    res3: Seq[Int] = WrappedArray(999, 1000, 1000, 1000, 1001, 1000)
    
    scala> data.repartition(12).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
    res4: Seq[Int] = WrappedArray(500, 501, 501, 501, 501, 500, 499, 499, 499, 499, 500, 500)
    

    如此美丽的分区!


    (对不起,这不是一个完整的答案。到目前为止,我只想分享我的发现。)



知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看