/**
* Tests the close method of the transactional writer.
*/
@Test
public void testTransactionalWriterClose() throws Exception {
try (TransactionalWriterTestContext context = new TransactionalWriterTestContext(PravegaWriterMode.EXACTLY_ONCE)) {
Transaction<Integer> trans = context.prepareTransaction();
try {
try (StreamSinkOperatorTestHarness<Integer> testHarness = createTestHarness(context.sinkFunction)) {
testHarness.open();
// prepare a worst-case situation that exercises the exception handling aspect of close
Mockito.doThrow(new IntentionalRuntimeException()).when(trans).abort();
Mockito.doThrow(new IntentionalRuntimeException()).when(context.pravegaWriter).close();
}
Assert.fail("expected an exception");
} catch (IntentionalRuntimeException e) {
Assert.assertEquals(1, e.getSuppressed().length);
Assert.assertTrue(e.getSuppressed()[0] instanceof IntentionalRuntimeException);
}
// verify that the transaction was aborted and the writer closed
verify(trans).abort();
verify(context.pravegaWriter).close();
}
}
FlinkPravegaWriterTest.java 文件源码
java
阅读 26
收藏 0
点赞 0
评论 0
项目:flink-connectors
作者:
评论列表
文章目录