/**消费消息 [指定Topic] 指定线程
*
* @param topicName 队列名称
* @param numStreams Number of streams to return
* @return A list of MsgIterator each of which provides an iterator over message over allowed topics
*/
static List<MsgIterator> consume(String topicName, int numStreams, String groupId) {
ConsumerConnector consumerConnector = KafkaHelper.getConsumer(groupId);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //(topic, #stream) pair
topicCountMap.put(topicName, numStreams);
//TODO: 可消费多个topic
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); //Using default decoder
List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get(topicName); //The number of items in the list is #streams, Each Stream supoorts an iterator over message/metadata pair
List<MsgIterator> iterList = new ArrayList<MsgIterator>();
for (KafkaStream<byte[], byte[]> stream : streamList) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
MsgIterator iter = new MsgIterator(it);
iterList.add(iter);
}
//KafkaStream[K,V] K代表partitio Key的类型,V代表Message Value的类型
return iterList;
}
KafkaHelper.java 文件源码
java
阅读 23
收藏 0
点赞 0
评论 0
项目:easyframe-msg
作者:
评论列表
文章目录