DefaultFlinkTypeConverter.java 文件源码

java
阅读 20 收藏 0 点赞 0 评论 0

项目:vs.msc.ws14 作者:
@SuppressWarnings("unchecked")
private<T> T convert(Record flinkType, int pos, Class<T> hadoopType) {
    if(hadoopType == LongWritable.class ) {
        return (T) new LongWritable((flinkType.getField(pos, LongValue.class)).getValue());
    }
    if(hadoopType == org.apache.hadoop.io.Text.class) {
        return (T) new Text((flinkType.getField(pos, StringValue.class)).getValue());
    }
    if(hadoopType == org.apache.hadoop.io.IntWritable.class) {
        return (T) new IntWritable((flinkType.getField(pos, IntValue.class)).getValue());
    }
    if(hadoopType == org.apache.hadoop.io.FloatWritable.class) {
        return (T) new FloatWritable((flinkType.getField(pos, FloatValue.class)).getValue());
    }
    if(hadoopType == org.apache.hadoop.io.DoubleWritable.class) {
        return (T) new DoubleWritable((flinkType.getField(pos, DoubleValue.class)).getValue());
    }
    if(hadoopType == org.apache.hadoop.io.BooleanWritable.class) {
        return (T) new BooleanWritable((flinkType.getField(pos, BooleanValue.class)).getValue());
    }
    if(hadoopType == org.apache.hadoop.io.ByteWritable.class) {
        return (T) new ByteWritable((flinkType.getField(pos, ByteValue.class)).getValue());
    }

    throw new RuntimeException("Unable to convert Flink type ("+flinkType.getClass().getCanonicalName()+") to Hadoop.");
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号