有没有一种方法可以将结果流传输到驱动程序,而无需等待所有分区完成执行?
有没有一种方法可以将结果流式传输到驱动程序,而无需等待所有分区完成执行?
我是Spark的新手,所以如果有更好的方法,请指出正确的方向。我想并行执行大量分区,并使用spark来处理分发/重新启动等操作。完成操作后,我想将结果收集到驱动程序中的单个存档中。
采用 toLocalIterator()
我已经能够做到这一点toLocalIterator()
,根据文档,它限制了驱动程序所需的资源。因此,它基本上可以工作。
问题在于,toLocalIterator()
不仅将驱动程序一次限制为一个分区,而且似乎一次只能执行一个分区。这对我没有用。下面的演示代码中演示了该行为。
使用persist()
+ count()
+toLocalIterator()
我发现我可以通过持久化然后用触发并行执行来解决这个问题count()
。之后,toLocalIterator()
可以快速提取预先计算的结果。
问题是我有很多分区(大约10 ^ 3或10 ^
4),每个分区大约需要15分钟。这样最终会保留大量数据(不是很多),但更糟糕的是,一旦整个工作持续太长时间,它似乎就失去了持久性。分区最终需要重新计算。我正在与可抢占的工作人员一起使用google
dataproc,所以这可能与它有关系,但是我很确定它最终甚至在固定工作人员上也要重新计算…我不确定到底发生了什么。
无论如何,在访问第一个结果之前必须执行所有分区似乎并不理想。
下面的演示代码演示了一切正常的情况,并且迭代不会触发重新计算时的最佳情况。
??? ->迭代数据而无需等待完整执行
有没有类似的东西?
复制/粘贴演示代码
import time
import pyspark.storagelevel
def slow_square(n):
time.sleep(5)
return n**2
with pyspark.SparkContext() as spark_context:
numbers = spark_context.parallelize(range(4), 4) # I think 4 is default executors locally
squares = numbers.map(slow_square)
# Use toLocalIterator()
start = time.time()
list(squares.toLocalIterator())
print('toLocalIterator() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
# I get about 20s
# Use count() to show that it's faster in parallel
start = time.time()
squares.count()
print('count() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
# I get about 5s
# Use persist() + count() + toLocalIterator()
start = time.time()
squares.persist(pyspark.storagelevel.StorageLevel.MEMORY_AND_DISK)
squares.count()
list(squares.toLocalIterator())
print('persisted toLocalIterator() took {:.1f} seconds (expected about 5)'.format(time.time() - start))
# I get about 5s
-
一般来说,这不是您通常在Spark中执行的操作。通常,我们尝试将通过驱动程序传递的数据量限制为最小。有两个主要原因:
- 将数据传递给Spark驱动程序很容易成为应用程序的瓶颈。
- 驱动程序实际上是批处理应用程序中的单点故障。
在正常情况下,您只需要继续工作,写入持久性存储,最后对结果应用进一步的处理步骤。
如果您希望能够迭代访问结果,则可以选择以下几种方法:
- 使用Spark Streaming。创建一个简单的过程,将数据推送到群集,然后收集每个批次。它简单,可靠,经过测试,并且不需要任何其他基础结构。
- 使用
foreach
/处理数据foreachPartition
,并在产生数据时将数据推送到外部消息传递系统,并使用另一个过程进行消耗和写入。这需要额外的组件,但从概念上讲可能更容易(您可以使用背压,缓冲结果,从驱动程序分离合并逻辑以最大程度地减少应用程序失败的风险)。 - Hack Spark累加器。任务完成后,火花累积器会更新,因此您可以分批处理累积的即将到来的数据。
警告 : 以下代码仅是概念验证。 它没有经过适当的测试,很可能是非常不可靠的。
AccumulatorParam
使用RXPy的示例# results_param.py from rx.subjects import Subject from pyspark import AccumulatorParam, TaskContext class ResultsParam(AccumulatorParam, Subject): """An observable accumulator which collects task results""" def zero(self, v): return [] def addInPlace(self, acc1, acc2): # This is executed on the workers so we have to # merge the results if (TaskContext.get() is not None and TaskContext().get().partitionId() is not None): acc1.extend(acc2) return acc1 else: # This is executed on the driver so we discard the results # and publish to self instead for x in acc2: self.on_next(x) return []
简单的Spark应用程序(Python 3.x):
# main.py import time from pyspark import SparkContext, TaskContext sc = SparkContext(master="local[4]") sc.addPyFile("results_param.py") from results_param import ResultsParam # Define accumulator acc = sc.accumulator([], ResultsParam()) # Dummy subscriber acc.accum_param.subscribe(print) def process(x): """Identity proccess""" result = x acc.add([result]) # Add some delay time.sleep(5) return result sc.parallelize(range(32), 8).foreach(process)
这相对简单,但是如果多个任务同时完成,则有使驱动程序不堪重负的风险,因此您必须大幅超额分配驱动程序资源(成比例地达到并行度和任务结果的预期大小)。
runJob
直接使用Scala (不支持Python)。
实际上,Spark实际上是异步获取结果的,只要您不关心顺序,就不需要等待所有数据被处理。例如,
reduce
您可以查看实现Scala。应该可以使用这种机制将分区推到Python进程中,但是我还没有尝试过。