@Test
public void testCheckPause() throws IOException, InterruptedException {
TopicCommand.createTopic(zk.getZkClient(),
new TopicCommand.TopicCommandOptions(new String[]{
"--zookeeper", "dummy", "--create", "--topic", TOPIC_NAME + "check_pause",
"--replication-factor", "2", "--partitions", "1"}));
String description = "{\n" +
" \"type\": \"kafka\",\n" +
" \"client.id\": \"kafkasink\",\n" +
" \"bootstrap.servers\": \"" + kafkaServer.getBrokerListStr() + "\",\n" +
" \"acks\": 1,\n" +
" \"buffer.memory\": 1000,\n" +
" \"batch.size\": 1000\n" +
"}";
final KafkaSink sink = jsonMapper.readValue(description, new TypeReference<Sink>(){});
sink.open();
final AtomicBoolean exceptionCaught = new AtomicBoolean(false);
final AtomicBoolean checkPaused = new AtomicBoolean(false);
final AtomicBoolean pending = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
sink.setRecordCounterListener(new Action3<Long, Long, Long>() {
@Override
public void call(Long queued, Long sent, Long dropped) {
if (dropped > 0) {
exceptionCaught.set(true);
if (sink.checkPause() > 0) {
checkPaused.set(true);
}
if (sink.getNumOfPendingMessages() > 0) {
pending.set(true);
}
latch.countDown();
}
}
});
for (int i = 0; i < 100; ++i) {
sink.writeTo(new DefaultMessageContainer(new Message(TOPIC_NAME + "check_pause", getBigData()), jsonMapper));
}
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(exceptionCaught.get());
assertTrue(checkPaused.get());
assertTrue(pending.get());
}
TestKafkaSink.java 文件源码
java
阅读 18
收藏 0
点赞 0
评论 0
项目:suro
作者:
评论列表
文章目录