@Test
public void testKafkaLogAppender() {
Properties consumerProps = new Properties();
consumerProps.put("zookeeper.connect", zookeeper);
consumerProps.put("group.id", "kafka-log-appender-test");
consumerProps.put("auto.offset.reset", "smallest");
consumerProps.put("schema.registry.url", schemaRegistry);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 1);
ConsumerIterator<String, Object> iterator = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps))
.createMessageStreams(topicMap, new StringDecoder(null), new KafkaAvroDecoder(new VerifiableProperties(consumerProps)))
.get(topic).get(0).iterator();
String testMessage = "I am a test message";
logger.info(testMessage);
MessageAndMetadata<String, Object> messageAndMetadata = iterator.next();
GenericRecord logLine = (GenericRecord) messageAndMetadata.message();
assertEquals(logLine.get("line").toString(), testMessage);
assertEquals(logLine.get("logtypeid"), KafkaLogAppender.InfoLogTypeId);
assertNotNull(logLine.get("source"));
assertEquals(((Map<CharSequence, Object>) logLine.get("timings")).size(), 1);
assertEquals(((Map<CharSequence, Object>) logLine.get("tag")).size(), 2);
}
KafkaLogAppenderTest.java 文件源码
java
阅读 17
收藏 0
点赞 0
评论 0
项目:java-kafka
作者:
评论列表
文章目录