KafkaReceiver.java 文件源码

java
阅读 24 收藏 0 点赞 0 评论 0

项目:koper 作者:
/**
 * 启动 MessageReceiver,开始监听topic消息
 */
@Override
public void start() {

    if (consumer == null) {
        //sync init
        synchronized (lock) {
            init();
        }
    }

    String topicString = buildTopicsString();

    Whitelist topicFilter = new Whitelist(topicString);
    List<KafkaStream<byte[], byte[]>> streamList = consumer.createMessageStreamsByFilter(topicFilter, partitions);

    if (org.apache.commons.collections.CollectionUtils.isEmpty(streamList))
        try {
            TimeUnit.MILLISECONDS.sleep(1);
        } catch (InterruptedException e) {
            log.warn(e.getMessage(), e);
        }
    processStreamsByTopic(topicString, streamList);

}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号