TestKafkaSink.java 文件源码

java
阅读 18 收藏 0 点赞 0 评论 0

项目:suro 作者:
@Test
public void testCheckPause() throws IOException, InterruptedException {
    TopicCommand.createTopic(zk.getZkClient(),
            new TopicCommand.TopicCommandOptions(new String[]{
                    "--zookeeper", "dummy", "--create", "--topic", TOPIC_NAME + "check_pause",
                    "--replication-factor", "2", "--partitions", "1"}));
    String description = "{\n" +
            "    \"type\": \"kafka\",\n" +
            "    \"client.id\": \"kafkasink\",\n" +
            "    \"bootstrap.servers\": \"" + kafkaServer.getBrokerListStr() + "\",\n" +
            "    \"acks\": 1,\n" +
            "    \"buffer.memory\": 1000,\n" +
            "    \"batch.size\": 1000\n" +
            "}";


    final KafkaSink sink = jsonMapper.readValue(description, new TypeReference<Sink>(){});
    sink.open();

    final AtomicBoolean exceptionCaught = new AtomicBoolean(false);
    final AtomicBoolean checkPaused = new AtomicBoolean(false);
    final AtomicBoolean pending = new AtomicBoolean(false);
    final CountDownLatch latch = new CountDownLatch(1);

    sink.setRecordCounterListener(new Action3<Long, Long, Long>() {

        @Override
        public void call(Long queued, Long sent, Long dropped) {
            if (dropped > 0) {
                exceptionCaught.set(true);
                if (sink.checkPause() > 0) {
                    checkPaused.set(true);
                }
                if (sink.getNumOfPendingMessages() > 0) {
                    pending.set(true);
                }
                latch.countDown();
            }
        }
    });
    for (int i = 0; i < 100; ++i) {
        sink.writeTo(new DefaultMessageContainer(new Message(TOPIC_NAME + "check_pause", getBigData()), jsonMapper));
    }
    assertTrue(latch.await(10, TimeUnit.SECONDS));
    assertTrue(exceptionCaught.get());
    assertTrue(checkPaused.get());
    assertTrue(pending.get());
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号