/**
* See {@link StateStore#putAll(String, String, Collection)}.
*
* <p>
* This implementation does not support putting the state objects into an existing store as
* append is to be supported by the Hadoop SequenceFile (HADOOP-7139).
* </p>
*/
@Override
public void putAll(String storeName, String tableName, Collection<T> states)
throws IOException {
Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName);
if (!this.fs.exists(tablePath) && !create(storeName, tableName)) {
throw new IOException("Failed to create a state file for table " + tableName);
}
Closer closer = Closer.create();
try {
SequenceFile.Writer writer =
closer.register(SequenceFile.createWriter(this.fs, this.conf, tablePath, Text.class, this.stateClass,
SequenceFile.CompressionType.BLOCK, new DefaultCodec()));
for (T state : states) {
writer.append(new Text(Strings.nullToEmpty(state.getId())), state);
}
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
}
FsStateStore.java 文件源码
java
阅读 23
收藏 0
点赞 0
评论 0
项目:Gobblin
作者:
评论列表
文章目录