def __init__(self, name, broker, source_topic, destination_topic):
sc = SparkContext("local[2]", name)
sc.setLogLevel('ERROR')
self.ssc = StreamingContext(sc, 5)
directKafkaStream = KafkaUtils.createDirectStream(
self.ssc,
[source_topic],
{'metadata.broker.list': broker}
)
producer = Producer(broker, destination_topic)
process_stream(directKafkaStream, producer)
评论列表
文章目录