@Test
public void testMultithread() throws IOException {
TopicCommand.createTopic(zk.getZkClient(),
new TopicCommand.TopicCommandOptions(new String[]{
"--zookeeper", "dummy", "--create", "--topic", TOPIC_NAME_MULTITHREAD,
"--replication-factor", "2", "--partitions", "1"}));
String description = "{\n" +
" \"type\": \"kafka\",\n" +
" \"client.id\": \"kafkasink\",\n" +
" \"metadata.broker.list\": \"" + kafkaServer.getBrokerListStr() + "\",\n" +
" \"request.required.acks\": 1,\n" +
" \"batchSize\": 10,\n" +
" \"jobQueueSize\": 3\n" +
"}";
ObjectMapper jsonMapper = new DefaultObjectMapper();
jsonMapper.registerSubtypes(new NamedType(KafkaSinkV2.class, "kafka"));
KafkaSinkV2 sink = jsonMapper.readValue(description, new TypeReference<Sink>(){});
sink.open();
int msgCount = 10000;
for (int i = 0; i < msgCount; ++i) {
Map<String, Object> msgMap = new ImmutableMap.Builder<String, Object>()
.put("key", Integer.toString(i))
.put("value", "message:" + i).build();
sink.writeTo(new DefaultMessageContainer(
new Message(TOPIC_NAME_MULTITHREAD, jsonMapper.writeValueAsBytes(msgMap)),
jsonMapper));
}
assertTrue(sink.getNumOfPendingMessages() > 0);
sink.close();
System.out.println(sink.getStat());
assertEquals(sink.getNumOfPendingMessages(), 0);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig("localhost:" + zk.getServerPort(), "gropuid_multhread"));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC_NAME_MULTITHREAD, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC_NAME_MULTITHREAD).get(0);
for (int i = 0; i < msgCount; ++i) {
stream.iterator().next();
}
try {
stream.iterator().next();
fail();
} catch (ConsumerTimeoutException e) {
//this is expected
consumer.shutdown();
}
}
TestKafkaSinkV2.java 文件源码
java
阅读 24
收藏 0
点赞 0
评论 0
项目:suro
作者:
评论列表
文章目录