@Test
public void clientCanCancelServerStreamExplicitly() throws InterruptedException {
AtomicInteger lastNumberConsumed = new AtomicInteger(Integer.MAX_VALUE);
ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(channel);
Flux<NumberProto.Number> test = stub
.responsePressure(Mono.just(Empty.getDefaultInstance()))
.doOnNext(number -> {lastNumberConsumed.set(number.getNumber(0)); System.out.println("C: " + number.getNumber(0));})
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.doOnComplete(() -> System.out.println("Completed"))
.doOnCancel(() -> System.out.println("Client canceled"));
Disposable subscription = test.publish().connect();
Thread.sleep(1000);
subscription.dispose();
Thread.sleep(1000);
// Cancellation may or may not deliver the last generated message due to delays in the gRPC processing thread
assertThat(Math.abs(lastNumberConsumed.get() - svc.getLastNumberProduced())).isLessThanOrEqualTo(3);
assertThat(svc.wasCanceled()).isTrue();
}
CancellationPropagationIntegrationTest.java 文件源码
java
阅读 26
收藏 0
点赞 0
评论 0
项目:reactive-grpc
作者:
评论列表
文章目录