def main():
if len(sys.argv) != 4:
print(USAGE)
exit(-1)
sc = SparkContext(appName="Realtime-Analytics-Engine")
ssc = StreamingContext(sc, TIMER)
zookeeper, in_topic, out_topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zookeeper, "analytics-engine-consumer", {in_topic: 1})
aggRDD = aggregate(kvs)
aggRDD.foreachRDD(lambda rec : publish(rec, out_topic))
ssc.start()
ssc.awaitTermination()
评论列表
文章目录