def main():
conf = SparkConf().set("spark.ui.showConsoleProgress", "false")
sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf)
def run():
rdd = sc.parallelize(range(10), 10).map(delayed(2))
reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
return reduced.map(delayed(2)).collect()
result = call_in_background(run)
status = sc.statusTracker()
while result.empty():
ids = status.getJobIdsForGroup()
for id in ids:
job = status.getJobInfo(id)
print("Job", id, "status: ", job.status)
for sid in job.stageIds:
info = status.getStageInfo(sid)
if info:
print("Stage %d: %d tasks total (%d active, %d complete)" %
(sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks))
time.sleep(1)
print("Job results are:", result.get())
sc.stop()
评论列表
文章目录