@Override
public void run() {
while (this.shouldContinue) {
final ConsumerIterator<byte[], byte[]> it = streams.get(0).iterator();
if (it.hasNext()) {
final byte[] message = it.next().message();
synchronized (this) {
this.message = message;
// Wake up getMessage() if it is waiting
if (this.waiting) {
notify();
}
while (this.message != null && this.shouldContinue)
try {
wait();
} catch (InterruptedException e) {
logger.info("Wait interrupted", e);
}
}
}
}
logger.info("readerThread {} exited", this.readerThread.getName());
this.readerThread = null;
}
KafkaSpout.java 文件源码
java
阅读 17
收藏 0
点赞 0
评论 0
项目:monasca-thresh
作者:
评论列表
文章目录