/**
* Starts the consumer thread.
*/
@Override
public void run() {
log.debug("Starting consumer for topic {}", topic);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
// For each message present on the partition...
while (it.hasNext()) {
Map<String, Object> event = null;
// Parse it with the parser associated with the topic
try {
event = parser.parse(new String(it.next().message(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
// Send it to the source
if (event != null) {
source.send(topic.getName(), event);
}
}
log.debug("Finished consumer for topic {}", topic);
}
Consumer.java 文件源码
java
阅读 27
收藏 0
点赞 0
评论 0
项目:cep
作者:
评论列表
文章目录