KafkaConsumerSuite(String zkConnectString, String topic)
{
_topic = topic;
Properties consumeProps = new Properties();
consumeProps.put("zookeeper.connect", zkConnectString);
consumeProps.put("group.id", _topic+"-"+System.nanoTime());
consumeProps.put("zookeeper.session.timeout.ms", "10000");
consumeProps.put("zookeeper.sync.time.ms", "10000");
consumeProps.put("auto.commit.interval.ms", "10000");
consumeProps.put("_consumer.timeout.ms", "10000");
_consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
_consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
_stream = streams.get(0);
_iterator = _stream.iterator();
}
KafkaTestBase.java 文件源码
java
阅读 18
收藏 0
点赞 0
评论 0
项目:incubator-gobblin
作者:
评论列表
文章目录