/**消费消息 [指定Topic]
*
* @param topicName 队列名称
* @param groupId Group Name
* @return
*/
static MsgIterator consume(String topicName, String groupId) {
ConsumerConnector consumerConnector = KafkaHelper.getConsumer(groupId);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //(topic, #stream) pair
topicCountMap.put(topicName, new Integer(1));
//TODO: 可消费多个topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); //Using default decoder
List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get(topicName); //The number of items in the list is #streams, Each Stream supoorts an iterator over message/metadata pair
KafkaStream<byte[], byte[]> stream = streamList.get(0);
//KafkaStream[K,V] K代表partitio Key的类型,V代表Message Value的类型
ConsumerIterator<byte[], byte[]> it = stream.iterator();
MsgIterator iter = new MsgIterator(it);
return iter;
}
KafkaHelper.java 文件源码
java
阅读 19
收藏 0
点赞 0
评论 0
项目:easyframe-msg
作者:
评论列表
文章目录