AvroConsumerThread.java 文件源码

java
阅读 24 收藏 0 点赞 0 评论 0

项目:iote2e 作者:
public void run() {
    try {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(User.getClassSchema());

        while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
            String key = new String(messageAndMetadata.key());
            User user = genericRecordToUser(recordInjection.invert(messageAndMetadata.message()).get());
            // User user = (User)
            // recordInjection.invert(messageAndMetadata.message()).get();
            String summary = "Thread " + m_threadNumber + ", topic=" + messageAndMetadata.topic() + ", partition="
                    + messageAndMetadata.partition() + ", key=" + key + ", user=" + user.toString() + ", offset="
                    + messageAndMetadata.offset() + ", timestamp=" + messageAndMetadata.timestamp()
                    + ", timestampType=" + messageAndMetadata.timestampType();
            System.out.println(summary);
        }
        System.out.println("Shutting down Thread: " + m_threadNumber);
    } catch (Exception e) {
        System.out.println("Exception in thread "+m_threadNumber);
        System.out.println(e);
        e.printStackTrace();
    }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号