def getOrCreate(cls, checkpointPath, setupFunc):
"""
Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
recreated from the checkpoint data. If the data does not exist, then the provided setupFunc
will be used to create a new context.
@param checkpointPath: Checkpoint directory used in an earlier streaming program
@param setupFunc: Function to create a new context and setup DStreams
"""
cls._ensure_initialized()
gw = SparkContext._gateway
# Check whether valid checkpoint information exists in the given path
ssc_option = gw.jvm.StreamingContextPythonHelper().tryRecoverFromCheckpoint(checkpointPath)
if ssc_option.isEmpty():
ssc = setupFunc()
ssc.checkpoint(checkpointPath)
return ssc
jssc = gw.jvm.JavaStreamingContext(ssc_option.get())
# If there is already an active instance of Python SparkContext use it, or create a new one
if not SparkContext._active_spark_context:
jsc = jssc.sparkContext()
conf = SparkConf(_jconf=jsc.getConf())
SparkContext(conf=conf, gateway=gw, jsc=jsc)
sc = SparkContext._active_spark_context
# update ctx in serializer
cls._transformerSerializer.ctx = sc
return StreamingContext(sc, None, jssc)
评论列表
文章目录