@Test
public void clientCanCancelServerStreamImplicitly() throws InterruptedException {
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(channel);
TestSubscriber<NumberProto.Number> subscription = stub
.responsePressure(Single.just(Empty.getDefaultInstance()))
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.doOnComplete(() -> System.out.println("Completed"))
.doOnCancel(() -> System.out.println("Client canceled"))
.take(10)
.test();
// Consume some work
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
subscription.dispose();
subscription.awaitTerminalEvent(3, TimeUnit.SECONDS);
subscription.assertValueCount(10);
subscription.assertTerminated();
assertThat(svc.wasCanceled()).isTrue();
}
CancellationPropagationIntegrationTest.java 文件源码
java
阅读 30
收藏 0
点赞 0
评论 0
项目:reactive-grpc
作者:
评论列表
文章目录