CancellationPropagationIntegrationTest.java 文件源码

java
阅读 26 收藏 0 点赞 0 评论 0

项目:reactive-grpc 作者:
@Test
public void clientCanCancelServerStreamExplicitly() throws InterruptedException {
    AtomicInteger lastNumberConsumed = new AtomicInteger(Integer.MAX_VALUE);
    ReactorNumbersGrpc.ReactorNumbersStub stub = ReactorNumbersGrpc.newReactorStub(channel);
    Flux<NumberProto.Number> test = stub
            .responsePressure(Mono.just(Empty.getDefaultInstance()))
            .doOnNext(number -> {lastNumberConsumed.set(number.getNumber(0)); System.out.println("C: " + number.getNumber(0));})
            .doOnError(throwable -> System.out.println(throwable.getMessage()))
            .doOnComplete(() -> System.out.println("Completed"))
            .doOnCancel(() -> System.out.println("Client canceled"));

    Disposable subscription = test.publish().connect();

    Thread.sleep(1000);
    subscription.dispose();
    Thread.sleep(1000);

    // Cancellation may or may not deliver the last generated message due to delays in the gRPC processing thread
    assertThat(Math.abs(lastNumberConsumed.get() - svc.getLastNumberProduced())).isLessThanOrEqualTo(3);
    assertThat(svc.wasCanceled()).isTrue();
}
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号