KafkaHelper.java 文件源码

java
阅读 23 收藏 0 点赞 0 评论 0

项目:easyframe-msg 作者:
/**消费消息  [指定Topic] 指定线程
 * 
 * @param topicName 队列名称
 * @param numStreams Number of streams to return
 * @return A list of MsgIterator each of which provides an iterator over message over allowed topics
 */
static List<MsgIterator> consume(String topicName, int numStreams, String groupId) {
    ConsumerConnector consumerConnector = KafkaHelper.getConsumer(groupId);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();    //(topic, #stream) pair
    topicCountMap.put(topicName, numStreams);

    //TODO: 可消费多个topic
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); //Using default decoder
    List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get(topicName);  //The number of items in the list is #streams, Each Stream supoorts an iterator over message/metadata pair

    List<MsgIterator> iterList = new ArrayList<MsgIterator>();
    for (KafkaStream<byte[], byte[]> stream : streamList) {
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        MsgIterator iter = new MsgIterator(it);
        iterList.add(iter);
    }

    //KafkaStream[K,V] K代表partitio Key的类型,V代表Message Value的类型
    return iterList;
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号