@Test
public void testStartWithKafkaOutage() throws Throwable {
String topicName = TOPIC_NAME + "kafkaoutage";
TopicCommand.createTopic(zk.getZkClient(),
new TopicCommand.TopicCommandOptions(new String[]{
"--zookeeper", "dummy", "--create", "--topic", topicName,
"--replication-factor", "2", "--partitions", "1"}));
String[] brokerList = kafkaServer.getBrokerListStr().split(",");
int port1 = Integer.parseInt(brokerList[0].split(":")[1]);
int port2 = Integer.parseInt(brokerList[1].split(":")[1]);
String description = "{\n" +
" \"type\": \"kafka\",\n" +
" \"client.id\": \"kafkasink\",\n" +
" \"bootstrap.servers\": \"" + kafkaServer.getBrokerListStr() + "\",\n" +
" \"acks\": 1\n" +
" }" +
"}";
kafkaServer.shutdown();
final KafkaSink sink = jsonMapper.readValue(description, new TypeReference<Sink>(){});
sink.open();
final int msgCount = 10000;
final CountDownLatch latch = new CountDownLatch(1);
sink.setRecordCounterListener(new Action3<Long, Long, Long>() {
@Override
public void call(Long queued, Long sent, Long dropped) {
if (sent == msgCount - sink.droppedRecords.get()) {
latch.countDown();
}
}
});
sendMessages(topicName, sink, msgCount);
kafkaServer.startServer(port1, port2); // running up
assertTrue(latch.await(10, TimeUnit.SECONDS));
sendMessages(topicName, sink, msgCount);
sink.close();
checkConsumer(topicName, 2 * msgCount - (int) sink.droppedRecords.get());
}
TestKafkaSink.java 文件源码
java
阅读 26
收藏 0
点赞 0
评论 0
项目:suro
作者:
评论列表
文章目录