/**
* 启动 MessageReceiver,开始监听topic消息
*/
@Override
public void start() {
if (consumer == null) {
//sync init
synchronized (lock) {
init();
}
}
String topicString = buildTopicsString();
Whitelist topicFilter = new Whitelist(topicString);
List<KafkaStream<byte[], byte[]>> streamList = consumer.createMessageStreamsByFilter(topicFilter, partitions);
if (org.apache.commons.collections.CollectionUtils.isEmpty(streamList))
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
log.warn(e.getMessage(), e);
}
processStreamsByTopic(topicString, streamList);
}
KafkaReceiver.java 文件源码
java
阅读 24
收藏 0
点赞 0
评论 0
项目:koper
作者:
评论列表
文章目录