/**
* See {@link StateStore#put(String, String, T)}.
*
* <p>
* This implementation does not support putting the state object into an existing store as
* append is to be supported by the Hadoop SequenceFile (HADOOP-7139).
* </p>
*/
@Override
public void put(String storeName, String tableName, T state)
throws IOException {
Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName);
if (!this.fs.exists(tablePath) && !create(storeName, tableName)) {
throw new IOException("Failed to create a state file for table " + tableName);
}
Closer closer = Closer.create();
try {
SequenceFile.Writer writer =
closer.register(SequenceFile.createWriter(this.fs, this.conf, tablePath, Text.class, this.stateClass,
SequenceFile.CompressionType.BLOCK, new DefaultCodec()));
writer.append(new Text(Strings.nullToEmpty(state.getId())), state);
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
FsStateStore.java 文件源码
java
阅读 23
收藏 0
点赞 0
评论 0
项目:Gobblin
作者:
评论列表
文章目录