TestKafkaSink.java 文件源码

java
阅读 26 收藏 0 点赞 0 评论 0

项目:suro 作者:
@Test
public void testStartWithKafkaOutage() throws Throwable {
    String topicName = TOPIC_NAME + "kafkaoutage";

    TopicCommand.createTopic(zk.getZkClient(),
        new TopicCommand.TopicCommandOptions(new String[]{
            "--zookeeper", "dummy", "--create", "--topic", topicName,
            "--replication-factor", "2", "--partitions", "1"}));

    String[] brokerList = kafkaServer.getBrokerListStr().split(",");
    int port1 = Integer.parseInt(brokerList[0].split(":")[1]);
    int port2 = Integer.parseInt(brokerList[1].split(":")[1]);

    String description = "{\n" +
        "    \"type\": \"kafka\",\n" +
        "    \"client.id\": \"kafkasink\",\n" +
        "    \"bootstrap.servers\": \"" + kafkaServer.getBrokerListStr() + "\",\n" +
        "    \"acks\": 1\n" +
        "    }" +
        "}";

    kafkaServer.shutdown();


    final KafkaSink sink = jsonMapper.readValue(description, new TypeReference<Sink>(){});
    sink.open();

    final int msgCount = 10000;
    final CountDownLatch latch = new CountDownLatch(1);
    sink.setRecordCounterListener(new Action3<Long, Long, Long>() {

        @Override
        public void call(Long queued, Long sent, Long dropped) {
            if (sent == msgCount - sink.droppedRecords.get()) {
                latch.countDown();
            }
        }
    });

    sendMessages(topicName, sink, msgCount);

    kafkaServer.startServer(port1, port2); // running up
    assertTrue(latch.await(10, TimeUnit.SECONDS));

    sendMessages(topicName, sink, msgCount);
    sink.close();

    checkConsumer(topicName, 2 * msgCount - (int) sink.droppedRecords.get());
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号