private void flowable() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
Log.e(TAG, "start send data ");
for (int i = 0; i < 100; i++) {
e.onNext(i);
}
e.onComplete();
}
}, BackpressureStrategy.DROP)//指定背压策略
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(@NonNull Subscription s) {
//1, onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法
//2, 参数为 Subscription ,Subscription 可用于向上游请求发射多少个元素,也可用于取笑请求
//3, 必须要调用Subscription 的request来请求发射数据,不然上游是不会发射数据的。
Log.e(TAG, "onSubscribe...");
s.request(10);
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext:" + integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError..." + t);
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete...");
}
});
}
RxJavaActivity.java 文件源码
java
阅读 45
收藏 0
点赞 0
评论 0
项目:DailyStudy
作者:
评论列表
文章目录