@SuppressWarnings("unchecked")
private<T> T convert(Record flinkType, int pos, Class<T> hadoopType) {
if(hadoopType == LongWritable.class ) {
return (T) new LongWritable((flinkType.getField(pos, LongValue.class)).getValue());
}
if(hadoopType == org.apache.hadoop.io.Text.class) {
return (T) new Text((flinkType.getField(pos, StringValue.class)).getValue());
}
if(hadoopType == org.apache.hadoop.io.IntWritable.class) {
return (T) new IntWritable((flinkType.getField(pos, IntValue.class)).getValue());
}
if(hadoopType == org.apache.hadoop.io.FloatWritable.class) {
return (T) new FloatWritable((flinkType.getField(pos, FloatValue.class)).getValue());
}
if(hadoopType == org.apache.hadoop.io.DoubleWritable.class) {
return (T) new DoubleWritable((flinkType.getField(pos, DoubleValue.class)).getValue());
}
if(hadoopType == org.apache.hadoop.io.BooleanWritable.class) {
return (T) new BooleanWritable((flinkType.getField(pos, BooleanValue.class)).getValue());
}
if(hadoopType == org.apache.hadoop.io.ByteWritable.class) {
return (T) new ByteWritable((flinkType.getField(pos, ByteValue.class)).getValue());
}
throw new RuntimeException("Unable to convert Flink type ("+flinkType.getClass().getCanonicalName()+") to Hadoop.");
}
DefaultFlinkTypeConverter.java 文件源码
java
阅读 20
收藏 0
点赞 0
评论 0
项目:vs.msc.ws14
作者:
评论列表
文章目录