KafkaTestBase.java 文件源码

java
阅读 18 收藏 0 点赞 0 评论 0

项目:incubator-gobblin 作者:
KafkaConsumerSuite(String zkConnectString, String topic)
{
  _topic = topic;
  Properties consumeProps = new Properties();
  consumeProps.put("zookeeper.connect", zkConnectString);
  consumeProps.put("group.id", _topic+"-"+System.nanoTime());
  consumeProps.put("zookeeper.session.timeout.ms", "10000");
  consumeProps.put("zookeeper.sync.time.ms", "10000");
  consumeProps.put("auto.commit.interval.ms", "10000");
  consumeProps.put("_consumer.timeout.ms", "10000");

  _consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumeProps));

  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
      _consumer.createMessageStreams(ImmutableMap.of(this._topic, 1));
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(this._topic);
  _stream = streams.get(0);
  _iterator = _stream.iterator();
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号