KafkaHelper.java 文件源码

java
阅读 19 收藏 0 点赞 0 评论 0

项目:easyframe-msg 作者:
/**消费消息  [指定Topic]
 * 
 * @param topicName 队列名称
 * @param groupId Group Name
 * @return
 */
static MsgIterator consume(String topicName, String groupId) {
    ConsumerConnector consumerConnector = KafkaHelper.getConsumer(groupId);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();    //(topic, #stream) pair
    topicCountMap.put(topicName, new Integer(1));

    //TODO: 可消费多个topic
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); //Using default decoder
    List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get(topicName);  //The number of items in the list is #streams, Each Stream supoorts an iterator over message/metadata pair
    KafkaStream<byte[], byte[]> stream = streamList.get(0);

    //KafkaStream[K,V] K代表partitio Key的类型,V代表Message Value的类型
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    MsgIterator iter = new MsgIterator(it);
    return iter;
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号