/**
* Post请求的Rxjava方式.
* @param url
* @param requestParams
* @return
*/
public <T> Flowable<T> post(String url, EasyRequestParams requestParams, RxEasyConverter<T> rxEasyConverter) {
FormBody.Builder builder = new FormBody.Builder();
ConcurrentHashMap<String, String> paramsMap = requestParams.getRequestParams();
for (ConcurrentHashMap.Entry<String, String> entry : paramsMap.entrySet()) {
builder.add(entry.getKey(), entry.getValue());
}
RequestBody requestBody = builder.build();
final Request request = new Request.Builder()
.url(url)
.post(requestBody)
.build();
Call call = EasyHttpClientManager.getInstance().getOkHttpClient(EasyCacheType.CACHE_TYPE_DEFAULT).newCall(request);
return Flowable.create(new CallFlowableOnSubscribe(call,rxEasyConverter), BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io());
}
java类io.reactivex.BackpressureStrategy的实例源码
RxEasyHttpManager.java 文件源码
项目:EasyHttp
阅读 32
收藏 0
点赞 0
评论 0
RxLocationManager.java 文件源码
项目:smart-asset-iot-android-demo
阅读 37
收藏 0
点赞 0
评论 0
@SuppressWarnings("MissingPermission")
@RequiresPermission(anyOf = {
Manifest.permission.ACCESS_COARSE_LOCATION,
Manifest.permission.ACCESS_FINE_LOCATION
})
public void startLocationUpdates(boolean checkLocationSettings) {
stopLocationUpdates();
locationUpdatesDisposable = locationSettingsCheck(checkLocationSettings)
.flatMapObservable(ignore -> locationUpdates()
.startWith(lastLocation()))
.map(this::transformLocation)
.toFlowable(BackpressureStrategy.LATEST)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::setLocation,
error -> Timber.e("Failed to get location updates", error));
}
BarcodeScannerFragment.java 文件源码
项目:smart-lens
阅读 38
收藏 0
点赞 0
评论 0
@Override
public void onImageCapture(@Nullable byte[] imageBytes) {
//Process the image using Tf.
Flowable<BarcodeInfo> flowable = Flowable.create(e -> {
Bitmap bitmap = null;
if (imageBytes != null) bitmap = CameraUtils.bytesToBitmap(imageBytes);
if (bitmap != null) e.onNext(mBarcodeScanner.scan(bitmap));
e.onComplete();
}, BackpressureStrategy.DROP);
final Subscription[] subscriptions = new Subscription[1];
flowable.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(subscription -> subscriptions[0] = subscription)
.doOnError(t -> {
Timber.e(t.getMessage());
subscriptions[0].cancel();
})
.doOnComplete(() -> subscriptions[0].cancel())
.subscribe(barcodeInfo -> {
//TODO Display the info
});
}
AsyncAspect.java 文件源码
项目:SAF-AOP
阅读 33
收藏 0
点赞 0
评论 0
private void asyncMethod(final ProceedingJoinPoint joinPoint) throws Throwable {
Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
Looper.prepare();
try {
joinPoint.proceed();
} catch (Throwable throwable) {
throwable.printStackTrace();
}
Looper.loop();
}
}
, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
DemoFlowable.java 文件源码
项目:Reactive-Programming-With-Java-9
阅读 33
收藏 0
点赞 0
评论 0
public static void main(String[] args) {
// TODO Auto-generated method stub
Flowable<String> month_maybe = Flowable.create(emitter -> {
try {
String[] monthArray = { "Jan", "Feb", "Mar", "Apl", "May", "Jun", "July", "Aug", "Sept", "Oct", "Nov",
"Dec" };
List<String> months = Arrays.asList(monthArray);
for (String month : months) {
emitter.onNext(month);
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
},BackpressureStrategy.MISSING);
month_maybe.subscribe(s -> System.out.println(s));
}
MessageViewModelTest.java 文件源码
项目:TurboChat
阅读 30
收藏 0
点赞 0
评论 0
@Test
public void shouldPassMessageToLocalStreamWhenSendMessage() throws Exception {
String message = "hello @alex http://youtube.com/q=look (love) there @yui you go http://twitter.com";
final List<Link> expectedLinks = asList(
new Link("http://youtube.com/q=look", ""),
new Link("http://twitter.com", ""));
when(userResolver.getLoggedInUser()).thenReturn(TestUtils.createMockUser());
final TestSubscriber<Message> userTestSubscriber = new TestSubscriber<>();
final Observable<Message> messageViewModelMessages = messageViewModel.localMessageStream();
messageViewModelMessages.toFlowable(BackpressureStrategy.LATEST).subscribe(userTestSubscriber);
final Message m = new Message(id, message, Arrays.asList("alex", "yui"),
Arrays.asList("love"), expectedLinks, TestUtils.createMockUser());
messageViewModel.sendMessage(message, Arrays.asList("love"), sendScheduler);
userTestSubscriber.assertValue(m);
}
RxJournalBackPressureBuffer.java 文件源码
项目:reactivejournal
阅读 31
收藏 0
点赞 0
评论 0
public static void main(String[] args) throws IOException {
ReactiveJournal reactiveJournal = new ReactiveJournal("/tmp/fastproducer");
reactiveJournal.clearCache();
Flowable<Long> fastProducer = FastProducerSlowConsumer.createFastProducer(BackpressureStrategy.MISSING, 500);
reactiveJournal.createReactiveRecorder().recordAsync(fastProducer,"input");
PlayOptions options = new PlayOptions().filter("input").replayRate(PlayOptions.ReplayRate.FAST);
Flowable journalInput = new RxJavaPlayer(reactiveJournal).play(options);
Consumer onNextSlowConsumer = FastProducerSlowConsumer.createOnNextSlowConsumer(10);
long startTime = System.currentTimeMillis();
journalInput.subscribe(onNextSlowConsumer::accept,
e -> System.out.println("ReactiveRecorder " + " " + e),
() -> System.out.println("ReactiveRecorder complete [" + (System.currentTimeMillis()-startTime) + "]")
);
}
RxReporter.java 文件源码
项目:buffer-slayer
阅读 38
收藏 0
点赞 0
评论 0
private RxReporter(Builder<M, R> builder) {
this.sender = builder.sender;
this.metrics = builder.metrics;
this.messageTimeoutNanos = builder.messageTimeoutNanos;
this.bufferedMaxMessages = builder.bufferedMaxMessages;
this.pendingMaxMessages = builder.pendingMaxMessages;
this.overflowStrategy = builder.overflowStrategy;
this.scheduler = builder.scheduler;
Flowable<SendingTask<M>> flowable = Flowable.create(this, BackpressureStrategy.MISSING);
initBackpressurePolicy(flowable)
.observeOn(Schedulers.single())
.groupBy(new MessagePartitioner())
.subscribe(new MessageGroupSubscriber(messageTimeoutNanos, bufferedMaxMessages, sender, scheduler));
}
RxPaperBookTest.java 文件源码
项目:RxPaper2
阅读 32
收藏 0
点赞 0
评论 0
@Test
public void testUpdatesChecked() throws Exception {
RxPaperBook book = RxPaperBook.with("UPDATES_CH", Schedulers.trampoline());
final String key = "hello";
final ComplexObject value = ComplexObject.random();
final TestSubscriber<ComplexObject> updatesSubscriber = TestSubscriber.create();
book.observe(key, ComplexObject.class, BackpressureStrategy.MISSING).subscribe(updatesSubscriber);
updatesSubscriber.assertValueCount(0);
book.write(key, value).subscribe();
updatesSubscriber.assertValueCount(1);
updatesSubscriber.assertValues(value);
final ComplexObject newValue = ComplexObject.random();
book.write(key, newValue).subscribe();
updatesSubscriber.assertValueCount(2);
updatesSubscriber.assertValues(value, newValue);
// Error value
final int wrongValue = 3;
book.write(key, wrongValue).test().assertComplete().assertNoErrors();
updatesSubscriber.assertValueCount(2);
updatesSubscriber.assertValues(value, newValue);
updatesSubscriber.assertNoErrors();
}
TibcoObservableTest.java 文件源码
项目:rxjavatraining
阅读 26
收藏 0
点赞 0
评论 0
@Test
public void createYourOwnTibco() throws Exception {
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
while (!e.isCancelled()) {
long numberRecords = e.requested();
System.out.println(numberRecords);
if (numberRecords > 0) {
}
}
}
}, BackpressureStrategy.BUFFER);
flowable.map(x -> x + "Yay!").subscribe(System.out::println);
}
RxProgress.java 文件源码
项目:RxProgress
阅读 39
收藏 0
点赞 0
评论 0
private <T> Flowable<T> forFlowable(Flowable<T> source, BackpressureStrategy backpressureStrategy) {
return Flowable.using(this::makeDialog,
new Function<ProgressDialog, Publisher<? extends T>>() {
@Override
public Publisher<? extends T> apply(@NonNull ProgressDialog dialog) throws Exception {
return Flowable.create(emitter -> {
if (builder.cancelable) {
dialog.setOnCancelListener(dialogInterface -> emitter.onComplete());
}
dialog.setOnDismissListener(dialogInterface -> emitter.onComplete());
source.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
}, backpressureStrategy);
}
}, Dialog::dismiss);
}
MessageViewModelTest.java 文件源码
项目:TurboChat
阅读 37
收藏 0
点赞 0
评论 0
@Test
public void shouldReturnApiStream() throws Exception {
final TestSubscriber<Message> userTestSubscriber = new TestSubscriber<>();
final Observable<Message> messageViewModelMessages = messageViewModel.apiSendMessageStream();
messageViewModelMessages.toFlowable(BackpressureStrategy.LATEST).subscribe(userTestSubscriber);
userTestSubscriber.assertNoErrors();
userTestSubscriber.assertNoValues();
}
TasksLocalDataSource.java 文件源码
项目:GitHub
阅读 32
收藏 0
点赞 0
评论 0
@Override
public Flowable<List<Task>> getTasks() {
String[] projection = {
TaskEntry.COLUMN_NAME_ENTRY_ID,
TaskEntry.COLUMN_NAME_TITLE,
TaskEntry.COLUMN_NAME_DESCRIPTION,
TaskEntry.COLUMN_NAME_COMPLETED
};
String sql = String.format("SELECT %s FROM %s", TextUtils.join(",", projection), TaskEntry.TABLE_NAME);
return mDatabaseHelper.createQuery(TaskEntry.TABLE_NAME, sql)
.mapToList(mTaskMapperFunction)
.toFlowable(BackpressureStrategy.BUFFER);
}
TasksLocalDataSource.java 文件源码
项目:GitHub
阅读 32
收藏 0
点赞 0
评论 0
@Override
public Flowable<Optional<Task>> getTask(@NonNull String taskId) {
String[] projection = {
TaskEntry.COLUMN_NAME_ENTRY_ID,
TaskEntry.COLUMN_NAME_TITLE,
TaskEntry.COLUMN_NAME_DESCRIPTION,
TaskEntry.COLUMN_NAME_COMPLETED
};
String sql = String.format("SELECT %s FROM %s WHERE %s LIKE ?",
TextUtils.join(",", projection), TaskEntry.TABLE_NAME, TaskEntry.COLUMN_NAME_ENTRY_ID);
return mDatabaseHelper.createQuery(TaskEntry.TABLE_NAME, sql, taskId)
.mapToOneOrDefault(cursor -> Optional.of(mTaskMapperFunction.apply(cursor)), Optional.<Task>absent())
.toFlowable(BackpressureStrategy.BUFFER);
}
RxJava2CallAdapter.java 文件源码
项目:GitHub
阅读 30
收藏 0
点赞 0
评论 0
@Override public Object adapt(Call<R> call) {
Observable<Response<R>> responseObservable = isAsync
? new CallEnqueueObservable<>(call)
: new CallExecuteObservable<>(call);
Observable<?> observable;
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return observable;
}
DownloadType.java 文件源码
项目:GitHub
阅读 30
收藏 0
点赞 0
评论 0
private Publisher<DownloadStatus> save(final Response<ResponseBody> response) {
return Flowable.create(new FlowableOnSubscribe<DownloadStatus>() {
@Override
public void subscribe(FlowableEmitter<DownloadStatus> e) throws Exception {
record.save(e, response);
}
}, BackpressureStrategy.LATEST);
}
SensorGathererTest.java 文件源码
项目:AndroidSensors
阅读 27
收藏 0
点赞 0
评论 0
@Before
public void setUp() throws Exception {
if (sensorGatherer == null)
throw new Error("sensorGatherer must be initialized before calling super.setUp() method");
when(sensorConfig.getBackpressureStrategy(any(SensorType.class))).thenReturn(BackpressureStrategy.BUFFER);
when(permissionChecker.isPermissionGranted()).thenReturn(true);
when(sensorChecker.isReady(any(SensorType.class))).thenReturn(true);
}
RxUtil.java 文件源码
项目:GitHub
阅读 36
收藏 0
点赞 0
评论 0
/**
* 生成Flowable
* @param <T>
* @return
*/
public static <T> Flowable<T> createData(final T t) {
return Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) throws Exception {
try {
emitter.onNext(t);
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.BUFFER);
}
ThrottleSearchActivity.java 文件源码
项目:GitHub
阅读 37
收藏 0
点赞 0
评论 0
@Override
protected void onResume() {
super.onResume();
// Listen to key presses and only start search after user paused to avoid excessive redrawing on the screen.
disposable = RxTextView.textChangeEvents(searchInputView)
.debounce(200, TimeUnit.MILLISECONDS) // default Scheduler is Schedulers.computation()
.observeOn(AndroidSchedulers.mainThread()) // Needed to access Realm data
.toFlowable(BackpressureStrategy.BUFFER)
.switchMap(textChangeEvent -> {
// Use Async API to move Realm queries off the main thread.
// Realm currently doesn't support the standard Schedulers.
return realm.where(Person.class)
.beginsWith("name", textChangeEvent.text().toString())
.findAllSortedAsync("name")
.asFlowable();
})
// Only continue once data is actually loaded
// RealmObservables will emit the unloaded (empty) list as its first item
.filter(people -> people.isLoaded())
.subscribe(people -> {
searchResultsView.removeAllViews();
for (Person person : people) {
TextView view = new TextView(ThrottleSearchActivity.this);
view.setText(person.getName());
searchResultsView.addView(view);
}
}, throwable -> throwable.printStackTrace());
}
RxJavaActivity.java 文件源码
项目:DailyStudy
阅读 45
收藏 0
点赞 0
评论 0
private void flowable() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
Log.e(TAG, "start send data ");
for (int i = 0; i < 100; i++) {
e.onNext(i);
}
e.onComplete();
}
}, BackpressureStrategy.DROP)//指定背压策略
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(@NonNull Subscription s) {
//1, onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法
//2, 参数为 Subscription ,Subscription 可用于向上游请求发射多少个元素,也可用于取笑请求
//3, 必须要调用Subscription 的request来请求发射数据,不然上游是不会发射数据的。
Log.e(TAG, "onSubscribe...");
s.request(10);
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext:" + integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError..." + t);
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete...");
}
});
}
IconPresenter.java 文件源码
项目:MBEStyle
阅读 41
收藏 0
点赞 0
评论 0
public void calcIconTotal() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> flowableEmitter) throws Exception {
XmlResourceParser xml = mView.getResources().getXml(R.xml.drawable);
int total = 0;
while (xml.getEventType() != XmlResourceParser.END_DOCUMENT) {
if (xml.getEventType() == XmlPullParser.START_TAG) {
if (xml.getName().startsWith("item")) {
total++;
}
}
xml.next();
}
flowableEmitter.onNext(total);
}
}, BackpressureStrategy.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
mView.setIconTotal(integer);
}
});
}
StorageImpl.java 文件源码
项目:vt-support
阅读 37
收藏 0
点赞 0
评论 0
@Override
public void putEntries(Observable<Entry> entries) {
final String insert =
"INSERT OR REPLACE INTO TILES(zoom_level, tile_column, tile_row, tile_data)"
+ " values (?, ?, ?, ?);";
final Observable<Object> params = entries.concatMap(entry -> {
byte[] compressedMvt;
try {
compressedMvt = CompressUtil.getCompressedAsGzip(entry.getVector());
} catch (final IOException ex) {
throw Exceptions.propagate(ex);
}
return Observable.<Object>just(entry.getZoomLevel(), entry.getColumn(),
flipY(entry.getRow(), entry.getZoomLevel()), compressedMvt);
})
// source: https://github.com/davidmoten/rxjava-jdbc/pull/46/files
.toList()
.flattenAsObservable(objects -> objects);
// TODO update when upstream is enhanced
dataSource.update(insert)
.parameterStream(params.toFlowable(BackpressureStrategy.BUFFER))
.counts()
.test() // TODO remove hack
.awaitDone(5, TimeUnit.SECONDS)
.assertComplete();
}
RxRatpack.java 文件源码
项目:ratpack-rx2
阅读 39
收藏 0
点赞 0
评论 0
/**
* @param promise
* @param strategy The {@link BackpressureStrategy} to use
* @param <T>
* @return
* @see RxRatpack#observe(Promise)
*/
public static <T> Flowable<T> flow(Promise<T> promise, BackpressureStrategy strategy) {
return Flowable.create(subscriber ->
promise.onError(subscriber::onError).then(value -> {
subscriber.onNext(value);
subscriber.onComplete();
}),
strategy);
}
MainActivity.java 文件源码
项目:Mix
阅读 32
收藏 0
点赞 0
评论 0
/**
* 水缸的更多处理
*/
private void flowableSize() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 10000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io())
//DROP和LATEST的区别在于,当发出事件数量有限时,后者一定会接收到最后一条数据,如这里的9999,而DROP是连续性的
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(128);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
BaseFlowableActivity.java 文件源码
项目:GetStartRxJava2.0
阅读 32
收藏 0
点赞 0
评论 0
private void doRxJavaWork() {
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
e.onNext("事件1");
e.onNext("事件2");
e.onNext("事件3");
e.onNext("事件4");
e.onComplete();
}
}, BackpressureStrategy.BUFFER);
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe: ");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String string) {
Log.d(TAG, "onNext: " + string);
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "onError: " + t.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
};
flowable.subscribe(subscriber);
}
CCRequest.java 文件源码
项目:EvolvingNetLib
阅读 35
收藏 0
点赞 0
评论 0
/**
* 获取内存缓存请求Flowable对象
*
* @return 内存缓存查询Flowable对象
*/
private Flowable<CCBaseResponse<T>> getMemoryCacheQueryFlowable() {
//内存缓存数据获取
return Flowable.create(new FlowableOnSubscribe<CCBaseResponse<T>>() {
@Override
public void subscribe(@NonNull FlowableEmitter<CCBaseResponse<T>> e) throws Exception {
T response = null;
try {
if (ccCacheQueryCallback != null) {
response = ccCacheQueryCallback.<T>onQueryFromMemory(cacheKey);
}
CCBaseResponse<T> tccBaseResponse = new CCBaseResponse<T>(response, true, true, false);
e.onNext(tccBaseResponse);
e.onComplete();
} catch (Exception exception) {
switch (cacheQueryMode) {
case CCCacheMode.QueryMode.MODE_ONLY_MEMORY:
e.onError(new CCDiskCacheQueryException(exception));
break;
default:
e.onComplete();
break;
}
}
}
}, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());
}
S2Client.java 文件源码
项目:ocraft-s2client
阅读 39
收藏 0
点赞 0
评论 0
private S2Client(Builder builder) {
connectToIp = builder.connectToIp;
connectToPort = builder.connectToPort;
traced = builder.traced;
tracer = builder.tracer;
game = builder.game;
log.info("Starting: {}", this);
Channel channel = channelProvider.getChannel();
responseStream = channel.outputStream().mergeWith(channel.errorStream())
.map(this::prepareResponse)
.toFlowable(BackpressureStrategy.ERROR)
.onBackpressureBuffer(cfg().getInt(OcraftConfig.CLIENT_BUFFER_SIZE_RESPONSE_BACKPRESSURE))
.observeOn(Schedulers.computation(), false, cfg().getInt(CLIENT_BUFFER_SIZE_RESPONSE_STREAM))
.publish()
.autoConnect()
.doOnSubscribe(s -> await.register())
.doOnCancel(await::arriveAndDeregister);
responseStream().subscribe(this);
await.arriveAndDeregister();
Optional.ofNullable(game).ifPresent(s2Controller -> {
responseStream().subscribe(s2Controller);
await.arriveAndDeregister();
});
channelProvider.start(connectToIp, connectToPort);
}
AbstractPresenter.java 文件源码
项目:AndroidMVPresenter
阅读 36
收藏 0
点赞 0
评论 0
@Override
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Throwable throwable) throws Exception {
if(++retryCount <= maxRetries){
return todoBeforeRetry.apply(throwable).toFlowable(BackpressureStrategy.BUFFER);
}
return Flowable.error(throwable);
}
});
}
Rx2Test2Activity.java 文件源码
项目:RX_Demo
阅读 35
收藏 0
点赞 0
评论 0
private void flowableTest() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 128; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR)//增加了一个参数
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
// s.request(Long.MAX_VALUE); //注意这句代码
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}
RxHelper.java 文件源码
项目:YiZhi
阅读 36
收藏 0
点赞 0
评论 0
/**
* 生成Flowable
*
* @param t
* @return Flowable
*/
public static <T> Flowable<T> createFlowable(final T t) {
return Flowable.create(new FlowableOnSubscribe<T>() {
@Override
public void subscribe(FlowableEmitter<T> emitter) throws Exception {
try {
emitter.onNext(t);
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
}, BackpressureStrategy.BUFFER);
}