@GET
@Timed
public Response consume(
@QueryParam("topic") String topic,
@QueryParam("timeout") Integer timeout
) {
if (Strings.isNullOrEmpty(topic))
return Response.status(400)
.entity(new String[]{"Undefined topic"})
.build();
Properties props = (Properties) consumerCfg.clone();
if (timeout != null) props.put("consumer.timeout.ms", "" + timeout);
ConsumerConfig config = new ConsumerConfig(props);
ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);
Map<String, Integer> streamCounts = Collections.singletonMap(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(streamCounts);
KafkaStream<byte[], byte[]> stream = streams.get(topic).get(0);
List<Message> messages = new ArrayList<>();
try {
for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : stream)
messages.add(new Message(messageAndMetadata));
} catch (ConsumerTimeoutException ignore) {
} finally {
connector.commitOffsets();
connector.shutdown();
}
return Response.ok(messages).build();
}
MessageResource.java 文件源码
java
阅读 29
收藏 0
点赞 0
评论 0
项目:dropwizard-kafka-http
作者:
评论列表
文章目录