有没有一种方法可以将结果流传输到驱动程序,而无需等待所有分区完成执行?

发布于 2021-01-29 16:38:41

有没有一种方法可以将结果流式传输到驱动程序,而无需等待所有分区完成执行?

我是Spark的新手,所以如果有更好的方法,请指出正确的方向。我想并行执行大量分区,并使用spark来处理分发/重新启动等操作。完成操作后,我想将结果收集到驱动程序中的单个存档中。

采用 toLocalIterator()

我已经能够做到这一点toLocalIterator()根据文档,它限制了驱动程序所需的资源。因此,它基本上可以工作。

问题在于,toLocalIterator()不仅将驱动程序一次限制为一个分区,而且似乎一次只能执行一个分区。这对我没有用。下面的演示代码中演示了该行为。

使用persist()+ count()+toLocalIterator()

我发现我可以通过持久化然后用触发并行执行来解决这个问题count()。之后,toLocalIterator()可以快速提取预先计算的结果。

问题是我有很多分区(大约10 ^ 3或10 ^
4),每个分区大约需要15分钟。这样最终会保留大量数据(不是很多),但更糟糕的是,一旦整个工作持续太长时间,它似乎就失去了持久性。分区最终需要重新计算。我正在与可抢占的工作人员一起使用google
dataproc,所以这可能与它有关系,但是我很确定它最终甚至在固定工作人员上也要重新计算…我不确定到底发生了什么。

无论如何,在访问第一个结果之前必须执行所有分区似乎并不理想。

下面的演示代码演示了一切正常的情况,并且迭代不会触发重新计算时的最佳情况。

??? ->迭代数据而无需等待完整执行

有没有类似的东西?

复制/粘贴演示代码

import time
import pyspark.storagelevel

def slow_square(n):
    time.sleep(5)
    return n**2


with pyspark.SparkContext() as spark_context:
    numbers = spark_context.parallelize(range(4), 4)  # I think 4 is default executors locally
    squares = numbers.map(slow_square)

    # Use toLocalIterator()
    start = time.time()
    list(squares.toLocalIterator())
    print('toLocalIterator() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
    # I get about 20s

    # Use count() to show that it's faster in parallel
    start = time.time()
    squares.count()
    print('count() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
    # I get about 5s

    # Use persist() + count() + toLocalIterator()
    start = time.time()
    squares.persist(pyspark.storagelevel.StorageLevel.MEMORY_AND_DISK)
    squares.count()
    list(squares.toLocalIterator())
    print('persisted toLocalIterator() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
    # I get about 5s
关注者
0
被浏览
48
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    一般来说,这不是您通常在Spark中执行的操作。通常,我们尝试将通过驱动程序传递的数据量限制为最小。有两个主要原因:

    • 将数据传递给Spark驱动程序很容易成为应用程序的瓶颈。
    • 驱动程序实际上是批处理应用程序中的单点故障。

    在正常情况下,您只需要继续工作,写入持久性存储,最后对结果应用进一步的处理步骤。

    如果您希望能够迭代访问结果,则可以选择以下几种方法:

    • 使用Spark Streaming。创建一个简单的过程,将数据推送到群集,然后收集每个批次。它简单,可靠,经过测试,并且不需要任何其他基础结构。
    • 使用foreach/处理数据foreachPartition,并在产生数据时将数据推送到外部消息传递系统,并使用另一个过程进行消耗和写入。这需要额外的组件,但从概念上讲可能更容易(您可以使用背压,缓冲结果,从驱动程序分离合并逻辑以最大程度地减少应用程序失败的风险)。
    • Hack Spark累加器。任务完成后,火花累积器会更新,因此您可以分批处理累积的即将到来的数据。

    警告以下代码仅是概念验证。 它没有经过适当的测试,很可能是非常不可靠的

    AccumulatorParam使用RXPy的示例

        # results_param.py
    
    from rx.subjects import Subject
    from pyspark import AccumulatorParam, TaskContext
    
    class ResultsParam(AccumulatorParam, Subject):
        """An observable accumulator which collects task results"""
        def zero(self, v):
            return []
    
        def addInPlace(self, acc1, acc2):
            # This is executed on the workers so we have to
            # merge the results
            if (TaskContext.get() is not None and 
                    TaskContext().get().partitionId() is not None):
                acc1.extend(acc2)
                return acc1
            else:
                # This is executed on the driver so we discard the results
                # and publish to self instead
                for x in acc2:
                    self.on_next(x)
                return []
    

    简单的Spark应用程序(Python 3.x):

        # main.py
    
    import time
    from pyspark import SparkContext, TaskContext
    
    sc = SparkContext(master="local[4]")
    sc.addPyFile("results_param.py")
    
    from results_param import ResultsParam
    
    # Define accumulator
    acc = sc.accumulator([], ResultsParam())
    
    # Dummy subscriber 
    acc.accum_param.subscribe(print)
    
    def process(x):
        """Identity proccess"""
        result = x
        acc.add([result])
    
        # Add some delay
        time.sleep(5)
    
        return result
    
    sc.parallelize(range(32), 8).foreach(process)
    

    这相对简单,但是如果多个任务同时完成,则有使驱动程序不堪重负的风险,因此您必须大幅超额分配驱动程序资源(成比例地达到并行度和任务结果的预期大小)。

    • runJob直接使用Scala (不支持Python)。

    实际上,Spark实际上是异步获取结果的,只要您不关心顺序,就不需要等待所有数据被处理。例如reduce您可以查看实现Scala

    应该可以使用这种机制将分区推到Python进程中,但是我还没有尝试过。



知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看