FsStateStore.java 文件源码

java
阅读 23 收藏 0 点赞 0 评论 0

项目:Gobblin 作者:
/**
 * See {@link StateStore#putAll(String, String, Collection)}.
 *
 * <p>
 *   This implementation does not support putting the state objects into an existing store as
 *   append is to be supported by the Hadoop SequenceFile (HADOOP-7139).
 * </p>
 */
@Override
public void putAll(String storeName, String tableName, Collection<T> states)
    throws IOException {
  Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName);
  if (!this.fs.exists(tablePath) && !create(storeName, tableName)) {
    throw new IOException("Failed to create a state file for table " + tableName);
  }

  Closer closer = Closer.create();
  try {
    SequenceFile.Writer writer =
        closer.register(SequenceFile.createWriter(this.fs, this.conf, tablePath, Text.class, this.stateClass,
            SequenceFile.CompressionType.BLOCK, new DefaultCodec()));
    for (T state : states) {
      writer.append(new Text(Strings.nullToEmpty(state.getId())), state);
    }
  } catch (Throwable t) {
    throw closer.rethrow(t);
  } finally {
    closer.close();
  }
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号