@Test(dependsOnMethods = "testSerializeToSequenceFile")
public void testDeserializeFromSequenceFile() throws IOException {
Queue<WorkUnitState> workUnitStates = Queues.newConcurrentLinkedQueue();
Closer closer = Closer.create();
try {
ParallelRunner parallelRunner = closer.register(new ParallelRunner(2, this.fs));
parallelRunner.deserializeFromSequenceFile(Text.class, WorkUnitState.class, new Path(this.outputPath, "seq1"),
workUnitStates);
parallelRunner.deserializeFromSequenceFile(Text.class, WorkUnitState.class, new Path(this.outputPath, "seq2"),
workUnitStates);
} catch (Throwable t) {
throw closer.rethrow(t);
} finally {
closer.close();
}
Assert.assertEquals(workUnitStates.size(), 2);
for (WorkUnitState workUnitState : workUnitStates) {
TestWatermark watermark = new Gson().fromJson(workUnitState.getActualHighWatermark(), TestWatermark.class);
Assert.assertTrue(watermark.getLongWatermark() == 10L || watermark.getLongWatermark() == 100L);
}
}
ParallelRunnerTest.java 文件源码
java
阅读 23
收藏 0
点赞 0
评论 0
项目:Gobblin
作者:
评论列表
文章目录