java类io.reactivex.BackpressureStrategy的实例源码

RxEasyHttpManager.java 文件源码 项目:EasyHttp 阅读 32 收藏 0 点赞 0 评论 0
/**
 * Post请求的Rxjava方式.
 * @param url
 * @param requestParams
 * @return
 */
public <T> Flowable<T> post(String url, EasyRequestParams requestParams, RxEasyConverter<T> rxEasyConverter) {
    FormBody.Builder builder = new FormBody.Builder();
    ConcurrentHashMap<String, String> paramsMap = requestParams.getRequestParams();
    for (ConcurrentHashMap.Entry<String, String> entry : paramsMap.entrySet()) {
        builder.add(entry.getKey(), entry.getValue());
    }

    RequestBody requestBody = builder.build();
    final Request request = new Request.Builder()
            .url(url)
            .post(requestBody)
            .build();

    Call call = EasyHttpClientManager.getInstance().getOkHttpClient(EasyCacheType.CACHE_TYPE_DEFAULT).newCall(request);

    return Flowable.create(new CallFlowableOnSubscribe(call,rxEasyConverter), BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.io());
}
RxLocationManager.java 文件源码 项目:smart-asset-iot-android-demo 阅读 37 收藏 0 点赞 0 评论 0
@SuppressWarnings("MissingPermission")
@RequiresPermission(anyOf = {
        Manifest.permission.ACCESS_COARSE_LOCATION,
        Manifest.permission.ACCESS_FINE_LOCATION
})
public void startLocationUpdates(boolean checkLocationSettings) {
    stopLocationUpdates();
    locationUpdatesDisposable = locationSettingsCheck(checkLocationSettings)
            .flatMapObservable(ignore -> locationUpdates()
                    .startWith(lastLocation()))
            .map(this::transformLocation)
            .toFlowable(BackpressureStrategy.LATEST)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::setLocation,
                    error -> Timber.e("Failed to get location updates", error));
}
BarcodeScannerFragment.java 文件源码 项目:smart-lens 阅读 38 收藏 0 点赞 0 评论 0
@Override
public void onImageCapture(@Nullable byte[] imageBytes) {
    //Process the image using Tf.
    Flowable<BarcodeInfo> flowable = Flowable.create(e -> {
        Bitmap bitmap = null;
        if (imageBytes != null) bitmap = CameraUtils.bytesToBitmap(imageBytes);
        if (bitmap != null) e.onNext(mBarcodeScanner.scan(bitmap));
        e.onComplete();
    }, BackpressureStrategy.DROP);

    final Subscription[] subscriptions = new Subscription[1];
    flowable.subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnSubscribe(subscription -> subscriptions[0] = subscription)
            .doOnError(t -> {
                Timber.e(t.getMessage());
                subscriptions[0].cancel();
            })
            .doOnComplete(() -> subscriptions[0].cancel())
            .subscribe(barcodeInfo -> {
                //TODO Display the info
            });
}
AsyncAspect.java 文件源码 项目:SAF-AOP 阅读 33 收藏 0 点赞 0 评论 0
private void asyncMethod(final ProceedingJoinPoint joinPoint) throws Throwable {

        Flowable.create(new FlowableOnSubscribe<Object>() {
                            @Override
                            public void subscribe(FlowableEmitter<Object> e) throws Exception {
                                Looper.prepare();
                                try {
                                    joinPoint.proceed();
                                } catch (Throwable throwable) {
                                    throwable.printStackTrace();
                                }
                                Looper.loop();
                            }
                        }
                , BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();
    }
DemoFlowable.java 文件源码 项目:Reactive-Programming-With-Java-9 阅读 33 收藏 0 点赞 0 评论 0
public static void main(String[] args) {
    // TODO Auto-generated method stub
    Flowable<String> month_maybe = Flowable.create(emitter -> {
        try {
            String[] monthArray = { "Jan", "Feb", "Mar", "Apl", "May", "Jun", "July", "Aug", "Sept", "Oct", "Nov",
                    "Dec" };

            List<String> months = Arrays.asList(monthArray);
            for (String month : months) {
                emitter.onNext(month);
            }
            emitter.onComplete();

        } catch (Exception e) {
            emitter.onError(e);
        }
    },BackpressureStrategy.MISSING);
    month_maybe.subscribe(s -> System.out.println(s));

}
MessageViewModelTest.java 文件源码 项目:TurboChat 阅读 30 收藏 0 点赞 0 评论 0
@Test
public void shouldPassMessageToLocalStreamWhenSendMessage() throws Exception {

    String message = "hello @alex http://youtube.com/q=look (love) there @yui you go http://twitter.com";

    final List<Link> expectedLinks = asList(
            new Link("http://youtube.com/q=look", ""),
            new Link("http://twitter.com", ""));

    when(userResolver.getLoggedInUser()).thenReturn(TestUtils.createMockUser());

    final TestSubscriber<Message> userTestSubscriber = new TestSubscriber<>();
    final Observable<Message> messageViewModelMessages = messageViewModel.localMessageStream();
    messageViewModelMessages.toFlowable(BackpressureStrategy.LATEST).subscribe(userTestSubscriber);
    final Message m = new Message(id, message, Arrays.asList("alex", "yui"),
            Arrays.asList("love"), expectedLinks, TestUtils.createMockUser());

    messageViewModel.sendMessage(message, Arrays.asList("love"), sendScheduler);
    userTestSubscriber.assertValue(m);

}
RxJournalBackPressureBuffer.java 文件源码 项目:reactivejournal 阅读 31 收藏 0 点赞 0 评论 0
public static void main(String[] args) throws IOException {

        ReactiveJournal reactiveJournal = new ReactiveJournal("/tmp/fastproducer");
        reactiveJournal.clearCache();
        Flowable<Long> fastProducer = FastProducerSlowConsumer.createFastProducer(BackpressureStrategy.MISSING, 500);

        reactiveJournal.createReactiveRecorder().recordAsync(fastProducer,"input");
        PlayOptions options = new PlayOptions().filter("input").replayRate(PlayOptions.ReplayRate.FAST);
        Flowable journalInput = new RxJavaPlayer(reactiveJournal).play(options);

        Consumer onNextSlowConsumer = FastProducerSlowConsumer.createOnNextSlowConsumer(10);

        long startTime = System.currentTimeMillis();
        journalInput.subscribe(onNextSlowConsumer::accept,
                e -> System.out.println("ReactiveRecorder " + " " + e),
                () -> System.out.println("ReactiveRecorder complete [" + (System.currentTimeMillis()-startTime) + "]")
        );

    }
RxReporter.java 文件源码 项目:buffer-slayer 阅读 38 收藏 0 点赞 0 评论 0
private RxReporter(Builder<M, R> builder) {
  this.sender = builder.sender;
  this.metrics = builder.metrics;

  this.messageTimeoutNanos = builder.messageTimeoutNanos;
  this.bufferedMaxMessages = builder.bufferedMaxMessages;
  this.pendingMaxMessages = builder.pendingMaxMessages;
  this.overflowStrategy = builder.overflowStrategy;
  this.scheduler = builder.scheduler;

  Flowable<SendingTask<M>> flowable = Flowable.create(this, BackpressureStrategy.MISSING);
  initBackpressurePolicy(flowable)
      .observeOn(Schedulers.single())
      .groupBy(new MessagePartitioner())
      .subscribe(new MessageGroupSubscriber(messageTimeoutNanos, bufferedMaxMessages, sender, scheduler));
}
RxPaperBookTest.java 文件源码 项目:RxPaper2 阅读 32 收藏 0 点赞 0 评论 0
@Test
public void testUpdatesChecked() throws Exception {
    RxPaperBook book = RxPaperBook.with("UPDATES_CH", Schedulers.trampoline());
    final String key = "hello";
    final ComplexObject value = ComplexObject.random();
    final TestSubscriber<ComplexObject> updatesSubscriber = TestSubscriber.create();
    book.observe(key, ComplexObject.class, BackpressureStrategy.MISSING).subscribe(updatesSubscriber);
    updatesSubscriber.assertValueCount(0);
    book.write(key, value).subscribe();
    updatesSubscriber.assertValueCount(1);
    updatesSubscriber.assertValues(value);
    final ComplexObject newValue = ComplexObject.random();
    book.write(key, newValue).subscribe();
    updatesSubscriber.assertValueCount(2);
    updatesSubscriber.assertValues(value, newValue);
    // Error value
    final int wrongValue = 3;
    book.write(key, wrongValue).test().assertComplete().assertNoErrors();
    updatesSubscriber.assertValueCount(2);
    updatesSubscriber.assertValues(value, newValue);
    updatesSubscriber.assertNoErrors();
}
TibcoObservableTest.java 文件源码 项目:rxjavatraining 阅读 26 收藏 0 点赞 0 评论 0
@Test
public void createYourOwnTibco() throws Exception {
    Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(FlowableEmitter<String> e) throws Exception {
            while (!e.isCancelled()) {
                long numberRecords = e.requested();
                System.out.println(numberRecords);
                if (numberRecords > 0) {
                }
            }
        }
    }, BackpressureStrategy.BUFFER);


    flowable.map(x -> x + "Yay!").subscribe(System.out::println);
}
RxProgress.java 文件源码 项目:RxProgress 阅读 39 收藏 0 点赞 0 评论 0
private <T> Flowable<T> forFlowable(Flowable<T> source, BackpressureStrategy backpressureStrategy) {
    return Flowable.using(this::makeDialog,
            new Function<ProgressDialog, Publisher<? extends T>>() {
                @Override
                public Publisher<? extends T> apply(@NonNull ProgressDialog dialog) throws Exception {
                    return Flowable.create(emitter -> {
                        if (builder.cancelable) {
                            dialog.setOnCancelListener(dialogInterface -> emitter.onComplete());
                        }
                        dialog.setOnDismissListener(dialogInterface -> emitter.onComplete());
                        source.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
                    }, backpressureStrategy);
                }
            }, Dialog::dismiss);
}
MessageViewModelTest.java 文件源码 项目:TurboChat 阅读 37 收藏 0 点赞 0 评论 0
@Test
public void shouldReturnApiStream() throws Exception {

    final TestSubscriber<Message> userTestSubscriber = new TestSubscriber<>();
    final Observable<Message> messageViewModelMessages = messageViewModel.apiSendMessageStream();
    messageViewModelMessages.toFlowable(BackpressureStrategy.LATEST).subscribe(userTestSubscriber);

    userTestSubscriber.assertNoErrors();
    userTestSubscriber.assertNoValues();

}
TasksLocalDataSource.java 文件源码 项目:GitHub 阅读 32 收藏 0 点赞 0 评论 0
@Override
public Flowable<List<Task>> getTasks() {
    String[] projection = {
            TaskEntry.COLUMN_NAME_ENTRY_ID,
            TaskEntry.COLUMN_NAME_TITLE,
            TaskEntry.COLUMN_NAME_DESCRIPTION,
            TaskEntry.COLUMN_NAME_COMPLETED
    };
    String sql = String.format("SELECT %s FROM %s", TextUtils.join(",", projection), TaskEntry.TABLE_NAME);
    return mDatabaseHelper.createQuery(TaskEntry.TABLE_NAME, sql)
            .mapToList(mTaskMapperFunction)
            .toFlowable(BackpressureStrategy.BUFFER);
}
TasksLocalDataSource.java 文件源码 项目:GitHub 阅读 32 收藏 0 点赞 0 评论 0
@Override
public Flowable<Optional<Task>> getTask(@NonNull String taskId) {
    String[] projection = {
            TaskEntry.COLUMN_NAME_ENTRY_ID,
            TaskEntry.COLUMN_NAME_TITLE,
            TaskEntry.COLUMN_NAME_DESCRIPTION,
            TaskEntry.COLUMN_NAME_COMPLETED
    };
    String sql = String.format("SELECT %s FROM %s WHERE %s LIKE ?",
            TextUtils.join(",", projection), TaskEntry.TABLE_NAME, TaskEntry.COLUMN_NAME_ENTRY_ID);
    return mDatabaseHelper.createQuery(TaskEntry.TABLE_NAME, sql, taskId)
            .mapToOneOrDefault(cursor -> Optional.of(mTaskMapperFunction.apply(cursor)), Optional.<Task>absent())
            .toFlowable(BackpressureStrategy.BUFFER);
}
RxJava2CallAdapter.java 文件源码 项目:GitHub 阅读 30 收藏 0 点赞 0 评论 0
@Override public Object adapt(Call<R> call) {
  Observable<Response<R>> responseObservable = isAsync
      ? new CallEnqueueObservable<>(call)
      : new CallExecuteObservable<>(call);

  Observable<?> observable;
  if (isResult) {
    observable = new ResultObservable<>(responseObservable);
  } else if (isBody) {
    observable = new BodyObservable<>(responseObservable);
  } else {
    observable = responseObservable;
  }

  if (scheduler != null) {
    observable = observable.subscribeOn(scheduler);
  }

  if (isFlowable) {
    return observable.toFlowable(BackpressureStrategy.LATEST);
  }
  if (isSingle) {
    return observable.singleOrError();
  }
  if (isMaybe) {
    return observable.singleElement();
  }
  if (isCompletable) {
    return observable.ignoreElements();
  }
  return observable;
}
DownloadType.java 文件源码 项目:GitHub 阅读 30 收藏 0 点赞 0 评论 0
private Publisher<DownloadStatus> save(final Response<ResponseBody> response) {
    return Flowable.create(new FlowableOnSubscribe<DownloadStatus>() {
        @Override
        public void subscribe(FlowableEmitter<DownloadStatus> e) throws Exception {
            record.save(e, response);
        }
    }, BackpressureStrategy.LATEST);
}
SensorGathererTest.java 文件源码 项目:AndroidSensors 阅读 27 收藏 0 点赞 0 评论 0
@Before
public void setUp() throws Exception {
    if (sensorGatherer == null)
        throw new Error("sensorGatherer must be initialized before calling super.setUp() method");

    when(sensorConfig.getBackpressureStrategy(any(SensorType.class))).thenReturn(BackpressureStrategy.BUFFER);
    when(permissionChecker.isPermissionGranted()).thenReturn(true);
    when(sensorChecker.isReady(any(SensorType.class))).thenReturn(true);
}
RxUtil.java 文件源码 项目:GitHub 阅读 36 收藏 0 点赞 0 评论 0
/**
 * 生成Flowable
 * @param <T>
 * @return
 */
public static <T> Flowable<T> createData(final T t) {
    return Flowable.create(new FlowableOnSubscribe<T>() {
        @Override
        public void subscribe(FlowableEmitter<T> emitter) throws Exception {
            try {
                emitter.onNext(t);
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.BUFFER);
}
ThrottleSearchActivity.java 文件源码 项目:GitHub 阅读 37 收藏 0 点赞 0 评论 0
@Override
protected void onResume() {
    super.onResume();

    // Listen to key presses and only start search after user paused to avoid excessive redrawing on the screen.
    disposable = RxTextView.textChangeEvents(searchInputView)
            .debounce(200, TimeUnit.MILLISECONDS) // default Scheduler is Schedulers.computation()
            .observeOn(AndroidSchedulers.mainThread()) // Needed to access Realm data
            .toFlowable(BackpressureStrategy.BUFFER)
            .switchMap(textChangeEvent -> {
                // Use Async API to move Realm queries off the main thread.
                // Realm currently doesn't support the standard Schedulers.
                return realm.where(Person.class)
                        .beginsWith("name", textChangeEvent.text().toString())
                        .findAllSortedAsync("name")
                        .asFlowable();
            })
            // Only continue once data is actually loaded
            // RealmObservables will emit the unloaded (empty) list as its first item
            .filter(people -> people.isLoaded())
            .subscribe(people -> {
                searchResultsView.removeAllViews();
                for (Person person : people) {
                    TextView view = new TextView(ThrottleSearchActivity.this);
                    view.setText(person.getName());
                    searchResultsView.addView(view);
                }
            }, throwable -> throwable.printStackTrace());
}
RxJavaActivity.java 文件源码 项目:DailyStudy 阅读 45 收藏 0 点赞 0 评论 0
private void flowable() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
            Log.e(TAG, "start send data ");
            for (int i = 0; i < 100; i++) {
                e.onNext(i);
            }
            e.onComplete();
        }
    }, BackpressureStrategy.DROP)//指定背压策略
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(@NonNull Subscription s) {
                    //1, onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法
                    //2, 参数为  Subscription ,Subscription 可用于向上游请求发射多少个元素,也可用于取笑请求
                    //3,  必须要调用Subscription 的request来请求发射数据,不然上游是不会发射数据的。
                    Log.e(TAG, "onSubscribe...");
                    s.request(10);
                }

                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG, "onNext:" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    Log.e(TAG, "onError..." + t);
                }

                @Override
                public void onComplete() {
                    Log.e(TAG, "onComplete...");
                }
            });

}
IconPresenter.java 文件源码 项目:MBEStyle 阅读 41 收藏 0 点赞 0 评论 0
public void calcIconTotal() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> flowableEmitter) throws Exception {
            XmlResourceParser xml = mView.getResources().getXml(R.xml.drawable);
            int total = 0;

            while (xml.getEventType() != XmlResourceParser.END_DOCUMENT) {
                if (xml.getEventType() == XmlPullParser.START_TAG) {
                    if (xml.getName().startsWith("item")) {
                        total++;
                    }
                }
                xml.next();
            }

            flowableEmitter.onNext(total);
        }
    }, BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(@NonNull Integer integer) throws Exception {
                    mView.setIconTotal(integer);
                }
            });
}
StorageImpl.java 文件源码 项目:vt-support 阅读 37 收藏 0 点赞 0 评论 0
@Override
public void putEntries(Observable<Entry> entries) {
  final String insert =
      "INSERT OR REPLACE INTO TILES(zoom_level, tile_column, tile_row, tile_data)"
          + " values (?, ?, ?, ?);";

  final Observable<Object> params = entries.concatMap(entry -> {
    byte[] compressedMvt;
    try {
      compressedMvt = CompressUtil.getCompressedAsGzip(entry.getVector());
    } catch (final IOException ex) {
      throw Exceptions.propagate(ex);
    }

    return Observable.<Object>just(entry.getZoomLevel(), entry.getColumn(),
        flipY(entry.getRow(), entry.getZoomLevel()), compressedMvt);
  })
      // source: https://github.com/davidmoten/rxjava-jdbc/pull/46/files
      .toList()
      .flattenAsObservable(objects -> objects);

  // TODO update when upstream is enhanced
  dataSource.update(insert)
      .parameterStream(params.toFlowable(BackpressureStrategy.BUFFER))
      .counts()
      .test() // TODO remove hack
      .awaitDone(5, TimeUnit.SECONDS)
      .assertComplete();
}
RxRatpack.java 文件源码 项目:ratpack-rx2 阅读 39 收藏 0 点赞 0 评论 0
/**
 * @param promise
 * @param strategy The {@link BackpressureStrategy} to use
 * @param <T>
 * @return
 * @see RxRatpack#observe(Promise)
 */
public static <T> Flowable<T> flow(Promise<T> promise, BackpressureStrategy strategy) {
  return Flowable.create(subscriber ->
      promise.onError(subscriber::onError).then(value -> {
        subscriber.onNext(value);
        subscriber.onComplete();
      }),
    strategy);
}
MainActivity.java 文件源码 项目:Mix 阅读 32 收藏 0 点赞 0 评论 0
/**
 * 水缸的更多处理
 */
private void flowableSize() {

    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 10000; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io())
            //DROP和LATEST的区别在于,当发出事件数量有限时,后者一定会接收到最后一条数据,如这里的9999,而DROP是连续性的
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Integer>() {

                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    mSubscription = s;
                    s.request(128);
                }

                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
                }

                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
}
BaseFlowableActivity.java 文件源码 项目:GetStartRxJava2.0 阅读 32 收藏 0 点赞 0 评论 0
private void doRxJavaWork() {
    Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(FlowableEmitter<String> e) throws Exception {
            e.onNext("事件1");
            e.onNext("事件2");
            e.onNext("事件3");
            e.onNext("事件4");
            e.onComplete();
        }
    }, BackpressureStrategy.BUFFER);

    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onSubscribe(Subscription s) {
            Log.d(TAG, "onSubscribe: ");
            s.request(Long.MAX_VALUE);

        }

        @Override
        public void onNext(String string) {
            Log.d(TAG, "onNext: " + string);
        }

        @Override
        public void onError(Throwable t) {
            Log.d(TAG, "onError: " + t.toString());
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete: ");

        }
    };

    flowable.subscribe(subscriber);
}
CCRequest.java 文件源码 项目:EvolvingNetLib 阅读 35 收藏 0 点赞 0 评论 0
/**
 * 获取内存缓存请求Flowable对象
 *
 * @return 内存缓存查询Flowable对象
 */
private Flowable<CCBaseResponse<T>> getMemoryCacheQueryFlowable() {
    //内存缓存数据获取
    return Flowable.create(new FlowableOnSubscribe<CCBaseResponse<T>>() {
        @Override
        public void subscribe(@NonNull FlowableEmitter<CCBaseResponse<T>> e) throws Exception {

            T response = null;

            try {

                if (ccCacheQueryCallback != null) {
                    response = ccCacheQueryCallback.<T>onQueryFromMemory(cacheKey);
                }

                CCBaseResponse<T> tccBaseResponse = new CCBaseResponse<T>(response, true, true, false);

                e.onNext(tccBaseResponse);
                e.onComplete();

            } catch (Exception exception) {

                switch (cacheQueryMode) {
                    case CCCacheMode.QueryMode.MODE_ONLY_MEMORY:
                        e.onError(new CCDiskCacheQueryException(exception));
                        break;
                    default:
                        e.onComplete();
                        break;
                }

            }
        }
    }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());

}
S2Client.java 文件源码 项目:ocraft-s2client 阅读 39 收藏 0 点赞 0 评论 0
private S2Client(Builder builder) {

        connectToIp = builder.connectToIp;
        connectToPort = builder.connectToPort;
        traced = builder.traced;
        tracer = builder.tracer;
        game = builder.game;

        log.info("Starting: {}", this);

        Channel channel = channelProvider.getChannel();
        responseStream = channel.outputStream().mergeWith(channel.errorStream())
                .map(this::prepareResponse)
                .toFlowable(BackpressureStrategy.ERROR)
                .onBackpressureBuffer(cfg().getInt(OcraftConfig.CLIENT_BUFFER_SIZE_RESPONSE_BACKPRESSURE))
                .observeOn(Schedulers.computation(), false, cfg().getInt(CLIENT_BUFFER_SIZE_RESPONSE_STREAM))
                .publish()
                .autoConnect()
                .doOnSubscribe(s -> await.register())
                .doOnCancel(await::arriveAndDeregister);

        responseStream().subscribe(this);
        await.arriveAndDeregister();

        Optional.ofNullable(game).ifPresent(s2Controller -> {
            responseStream().subscribe(s2Controller);
            await.arriveAndDeregister();
        });

        channelProvider.start(connectToIp, connectToPort);
    }
AbstractPresenter.java 文件源码 项目:AndroidMVPresenter 阅读 36 收藏 0 点赞 0 评论 0
@Override
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
    return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
        @Override
        public Publisher<?> apply(Throwable throwable) throws Exception {
            if(++retryCount <= maxRetries){
                return todoBeforeRetry.apply(throwable).toFlowable(BackpressureStrategy.BUFFER);
            }
            return Flowable.error(throwable);
        }
    });
}
Rx2Test2Activity.java 文件源码 项目:RX_Demo 阅读 35 收藏 0 点赞 0 评论 0
private void flowableTest() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 128; i++) {
                    Log.d(TAG, "emit " + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR)//增加了一个参数
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
//                        s.request(Long.MAX_VALUE);  //注意这句代码
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);

                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }
RxHelper.java 文件源码 项目:YiZhi 阅读 36 收藏 0 点赞 0 评论 0
/**
 * 生成Flowable
 *
 * @param t
 * @return Flowable
 */
public static <T> Flowable<T> createFlowable(final T t) {
    return Flowable.create(new FlowableOnSubscribe<T>() {
        @Override
        public void subscribe(FlowableEmitter<T> emitter) throws Exception {
            try {
                emitter.onNext(t);
                emitter.onComplete();
            } catch (Exception e) {
                emitter.onError(e);
            }
        }
    }, BackpressureStrategy.BUFFER);
}


问题


面经


文章

微信
公众号

扫码关注公众号