java类org.apache.hadoop.io.serializer.SerializationFactory的实例源码

TestMerge.java 文件源码 项目:FlexMap 阅读 23 收藏 0 点赞 0 评论 0
public KeyValueWriter(Configuration conf, OutputStream output,
                      Class<K> kyClass, Class<V> valClass
                     ) throws IOException {
  keyClass = kyClass;
  valueClass = valClass;
  dataBuffer = new DataOutputBuffer();
  SerializationFactory serializationFactory
                                         = new SerializationFactory(conf);
  keySerializer
              = (Serializer<K>)serializationFactory.getSerializer(keyClass);
  keySerializer.open(dataBuffer);
  valueSerializer
            = (Serializer<V>)serializationFactory.getSerializer(valueClass);
  valueSerializer.open(dataBuffer);
  outputStream = new DataOutputStream(output);
}
TestWritableJobConf.java 文件源码 项目:FlexMap 阅读 20 收藏 0 点赞 0 评论 0
private <K> K serDeser(K conf) throws Exception {
  SerializationFactory factory = new SerializationFactory(CONF);
  Serializer<K> serializer =
    factory.getSerializer(GenericsUtil.getClass(conf));
  Deserializer<K> deserializer =
    factory.getDeserializer(GenericsUtil.getClass(conf));

  DataOutputBuffer out = new DataOutputBuffer();
  serializer.open(out);
  serializer.serialize(conf);
  serializer.close();

  DataInputBuffer in = new DataInputBuffer();
  in.reset(out.getData(), out.getLength());
  deserializer.open(in);
  K after = deserializer.deserialize(null);
  deserializer.close();
  return after;
}
Task.java 文件源码 项目:FlexMap 阅读 28 收藏 0 点赞 0 评论 0
public ValuesIterator (RawKeyValueIterator in, 
                       RawComparator<KEY> comparator, 
                       Class<KEY> keyClass,
                       Class<VALUE> valClass, Configuration conf, 
                       Progressable reporter)
  throws IOException {
  this.in = in;
  this.comparator = comparator;
  this.reporter = reporter;
  SerializationFactory serializationFactory = new SerializationFactory(conf);
  this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
  this.keyDeserializer.open(keyIn);
  this.valDeserializer = serializationFactory.getDeserializer(valClass);
  this.valDeserializer.open(this.valueIn);
  readNextKey();
  key = nextKey;
  nextKey = null; // force new instance creation
  hasNext = more;
}
HadoopV2TaskContext.java 文件源码 项目:ignite 阅读 25 收藏 0 点赞 0 评论 0
/**
 * Gets serializer for specified class.
 *
 * @param cls Class.
 * @param jobConf Job configuration.
 * @return Appropriate serializer.
 */
@SuppressWarnings("unchecked")
private HadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException {
    A.notNull(cls, "cls");

    SerializationFactory factory = new SerializationFactory(jobConf);

    Serialization<?> serialization = factory.getSerialization(cls);

    if (serialization == null)
        throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName());

    if (serialization.getClass() == WritableSerialization.class)
        return new HadoopWritableSerialization((Class<? extends Writable>)cls);

    return new HadoopSerializationWrapper(serialization, cls);
}
SubmittedJob.java 文件源码 项目:hServer 阅读 21 收藏 0 点赞 0 评论 0
@SuppressWarnings("unchecked")
private static <T> T getSplitDetails(FSDataInputStream inFile, long offset, Configuration configuration)
        throws IOException {
    inFile.seek(offset);
    String className = StringInterner.weakIntern(Text.readString(inFile));
    Class<T> cls;
    try {
        cls = (Class<T>) configuration.getClassByName(className);
    } catch (ClassNotFoundException ce) {
        IOException wrap = new IOException("Split class " + className +
                " not found");
        wrap.initCause(ce);
        throw wrap;
    }
    SerializationFactory factory = new SerializationFactory(configuration);
    Deserializer<T> deserializer =
            (Deserializer<T>) factory.getDeserializer(cls);
    deserializer.open(inFile);
    T split = deserializer.deserialize(null);
    return split;
}
SubmittedJob.java 文件源码 项目:hServer 阅读 22 收藏 0 点赞 0 评论 0
@SuppressWarnings("unchecked")
private static <T> T getSplitDetails(FSDataInputStream inFile, long offset, Configuration configuration)
        throws IOException {
    inFile.seek(offset);
    String className = StringInterner.weakIntern(Text.readString(inFile));
    Class<T> cls;
    try {
        cls = (Class<T>) configuration.getClassByName(className);
    } catch (ClassNotFoundException ce) {
        IOException wrap = new IOException("Split class " + className +
                " not found");
        wrap.initCause(ce);
        throw wrap;
    }
    SerializationFactory factory = new SerializationFactory(configuration);
    Deserializer<T> deserializer =
            (Deserializer<T>) factory.getDeserializer(cls);
    deserializer.open(inFile);
    T split = deserializer.deserialize(null);
    return split;
}
SubmittedJob.java 文件源码 项目:hServer 阅读 19 收藏 0 点赞 0 评论 0
@SuppressWarnings("unchecked")
private static <T> T getSplitDetails(FSDataInputStream inFile, long offset, Configuration configuration)
        throws IOException {
    inFile.seek(offset);
    String className = StringInterner.weakIntern(Text.readString(inFile));
    Class<T> cls;
    try {
        cls = (Class<T>) configuration.getClassByName(className);
    } catch (ClassNotFoundException ce) {
        IOException wrap = new IOException("Split class " + className +
                " not found");
        wrap.initCause(ce);
        throw wrap;
    }
    SerializationFactory factory = new SerializationFactory(configuration);
    Deserializer<T> deserializer =
            (Deserializer<T>) factory.getDeserializer(cls);
    deserializer.open(inFile);
    T split = deserializer.deserialize(null);
    return split;
}
SubmittedJob.java 文件源码 项目:hServer 阅读 20 收藏 0 点赞 0 评论 0
@SuppressWarnings("unchecked")
private static <T> T getSplitDetails(FSDataInputStream inFile, long offset, Configuration configuration)
        throws IOException {
    inFile.seek(offset);
    String className = StringInterner.weakIntern(Text.readString(inFile));
    Class<T> cls;
    try {
        cls = (Class<T>) configuration.getClassByName(className);
    } catch (ClassNotFoundException ce) {
        IOException wrap = new IOException("Split class " + className +
                " not found");
        wrap.initCause(ce);
        throw wrap;
    }
    SerializationFactory factory = new SerializationFactory(configuration);
    Deserializer<T> deserializer =
            (Deserializer<T>) factory.getDeserializer(cls);
    deserializer.open(inFile);
    T split = deserializer.deserialize(null);
    return split;
}
SubmittedJob.java 文件源码 项目:hServer 阅读 21 收藏 0 点赞 0 评论 0
@SuppressWarnings("unchecked")
private static <T> T getSplitDetails(FSDataInputStream inFile, long offset, Configuration configuration)
        throws IOException {
    inFile.seek(offset);
    String className = StringInterner.weakIntern(Text.readString(inFile));
    Class<T> cls;
    try {
        cls = (Class<T>) configuration.getClassByName(className);
    } catch (ClassNotFoundException ce) {
        IOException wrap = new IOException("Split class " + className +
                " not found");
        wrap.initCause(ce);
        throw wrap;
    }
    SerializationFactory factory = new SerializationFactory(configuration);
    Deserializer<T> deserializer =
            (Deserializer<T>) factory.getDeserializer(cls);
    deserializer.open(inFile);
    T split = deserializer.deserialize(null);
    return split;
}
TestMerge.java 文件源码 项目:hops 阅读 17 收藏 0 点赞 0 评论 0
public KeyValueWriter(Configuration conf, OutputStream output,
                      Class<K> kyClass, Class<V> valClass
                     ) throws IOException {
  keyClass = kyClass;
  valueClass = valClass;
  dataBuffer = new DataOutputBuffer();
  SerializationFactory serializationFactory
                                         = new SerializationFactory(conf);
  keySerializer
              = (Serializer<K>)serializationFactory.getSerializer(keyClass);
  keySerializer.open(dataBuffer);
  valueSerializer
            = (Serializer<V>)serializationFactory.getSerializer(valueClass);
  valueSerializer.open(dataBuffer);
  outputStream = new DataOutputStream(output);
}


问题


面经


文章

微信
公众号

扫码关注公众号