@Test
public void testLogging() throws InterruptedException {
final Logger logger = loggerContext.getLogger("ROOT");
unit.start();
assertTrue("appender is started", unit.isStarted());
for (int i = 0; i<1000; ++i) {
final LoggingEvent loggingEvent = new LoggingEvent("a.b.c.d", logger, Level.INFO, "message"+i, null, new Object[0]);
unit.append(loggingEvent);
}
final Properties consumerProperties = new Properties();
consumerProperties.put("metadata.broker.list", kafka.getBrokerList());
consumerProperties.put("group.id", "simple-consumer-" + new Random().nextInt());
consumerProperties.put("auto.commit.enable","false");
consumerProperties.put("auto.offset.reset","smallest");
consumerProperties.put("zookeeper.connect", kafka.getZookeeperConnection());
final kafka.consumer.ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(consumerProperties);
final ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
final KafkaStream<byte[], byte[]> log = javaConsumerConnector.createMessageStreamsByFilter(new Whitelist("logs"),1).get(0);
final ConsumerIterator<byte[], byte[]> iterator = log.iterator();
for (int i=0; i<1000; ++i) {
final String messageFromKafka = new String(iterator.next().message(), UTF8);
assertThat(messageFromKafka, Matchers.equalTo("message"+i));
}
}
KafkaAppenderIT.java 文件源码
java
阅读 24
收藏 0
点赞 0
评论 0
项目:wngn-jms-kafka
作者:
评论列表
文章目录