def main():
conf = SparkConf().setMaster("local[2]").setAppName("YelpConsumer")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
kstream = KafkaUtils.createDirectStream(ssc, topics=['yelp-stream'],
kafkaParams={"metadata.broker.list": 'localhost:9092'})
parsed_json = kstream.map(lambda (k, v): json.loads(v))
remapped_data = parsed_json.map(remap_elastic)
remapped_data.foreachRDD(writeElasticSearch)
ssc.start()
ssc.awaitTermination()
Consumer.py 文件源码
python
阅读 36
收藏 0
点赞 0
评论 0
评论列表
文章目录