java类rx.subscriptions.Subscriptions的实例源码

BaseObservable.java 文件源码 项目:GitHub 阅读 15 收藏 0 点赞 0 评论 0
@Override
public void call(Subscriber<? super T> subscriber) {

    final GoogleApiClient apiClient = createApiClient(subscriber);
    try {
        apiClient.connect();
    } catch (Throwable ex) {
        subscriber.onError(ex);
    }

    subscriber.add(Subscriptions.create(new Action0() {
        @Override
        public void call() {
            if (apiClient.isConnected() || apiClient.isConnecting()) {
                onUnsubscribed(apiClient);
                apiClient.disconnect();
            }
        }
    }));
}
PendingResultObservable.java 文件源码 项目:GitHub 阅读 24 收藏 0 点赞 0 评论 0
@Override
public void call(final Subscriber<? super T> subscriber) {
    result.setResultCallback(new ResultCallback<T>() {
        @Override
        public void onResult(T t) {
            subscriber.onNext(t);
            complete = true;
            subscriber.onCompleted();
        }
    });
    subscriber.add(Subscriptions.create(new Action0() {
        @Override
        public void call() {
            if (!complete) {
                result.cancel();
            }
        }
    }));
}
BonjourFinder.java 文件源码 项目:mobile-store 阅读 15 收藏 0 点赞 0 评论 0
public static Observable<Peer> createBonjourObservable(final Context context) {
    return Observable.create(new Observable.OnSubscribe<Peer>() {
        @Override
        public void call(Subscriber<? super Peer> subscriber) {
            final BonjourFinder finder = new BonjourFinder(context, subscriber);

            subscriber.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    finder.cancel();
                }
            }));

            finder.scan();
        }
    });
}
BluetoothFinder.java 文件源码 项目:mobile-store 阅读 16 收藏 0 点赞 0 评论 0
public static Observable<Peer> createBluetoothObservable(final Context context) {
    return Observable.create(new Observable.OnSubscribe<Peer>() {
        @Override
        public void call(Subscriber<? super Peer> subscriber) {
            final BluetoothFinder finder = new BluetoothFinder(context, subscriber);

            subscriber.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    finder.cancel();
                }
            }));

            finder.scan();
        }
    });
}
XhrResourceBuilder.java 文件源码 项目:autorest-streaming-example 阅读 100 收藏 0 点赞 0 评论 0
private <T> void eventSourceSubscription(Subscriber<T> s) {
    final EventSource source = new EventSource(uri());
    final QueuedProducer<T> producer = new QueuedProducer<>(s);
    try {
        s.add(subscribeEventListener(source, "message", evt -> {
            producer.onNext(parse(Js.<MessageEvent<String>>cast(evt).data));
        }));
        s.add(subscribeEventListener(source, "open", evt -> {
            log.fine("Connection opened: " + uri());
        }));
        s.add(subscribeEventListener(source, "error", evt -> {
            log.log(Level.SEVERE, "Error: " + evt);
            if (source.readyState == source.CLOSED) {
                producer.onError(new RuntimeException("Event source error"));
            }
        }));
        s.setProducer(producer);
        s.add(Subscriptions.create(() -> {
            // hack because elemental API EventSource.close is missing
            Js.<MessagePort>uncheckedCast(source).close();
        }));
    } catch (Throwable e) {
        log.log(Level.FINE, "Received http error for: " + uri(), e);
        s.onError(new RuntimeException("Event source error", e));
    }
}
RxFirebase.java 文件源码 项目:RxFBase 阅读 14 收藏 0 点赞 0 评论 0
@NonNull public static Observable<DataSnapshot> singleValue(final Query query) {
    return Observable.create(new Observable.OnSubscribe<DataSnapshot>() {
        @Override public void call(final Subscriber<? super DataSnapshot> subscriber) {
            final ValueEventListener valueEvent = query.addValueEventListener(new ValueEventListener() {
                @Override public void onDataChange(DataSnapshot dataSnapshot) {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onNext(dataSnapshot);
                    }
                }
                @Override public void onCancelled(DatabaseError databaseError) {
                    subscriber.onError(databaseError.toException());
                }
            });
            subscriber.add(Subscriptions.create(new Action0() {
                @Override public void call() {
                    query.removeEventListener(valueEvent);
                }
            }));
        }
    });
}
HandlerThreadScheduler.java 文件源码 项目:MoeSampleApp 阅读 25 收藏 0 点赞 0 评论 0
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
    if (innerSubscription.isUnsubscribed()) {
        return Subscriptions.empty();
    }
    final ScheduledAction scheduledAction = new ScheduledAction(action, operationQueue);
    final ScheduledExecutorService executor = IOSScheduledExecutorPool.getInstance();
    Future<?> future;
    if (delayTime <= 0) {
        future = executor.submit(scheduledAction);
    } else {
        future = executor.schedule(scheduledAction, delayTime, unit);
    }
    scheduledAction.add(Subscriptions.from(future));
    scheduledAction.addParent(innerSubscription);
    return scheduledAction;
}
DatabaseManager.java 文件源码 项目:navigator 阅读 20 收藏 0 点赞 0 评论 0
public void openDatabase() {
    subscription = Observable.create(new Observable.OnSubscribe<Realm>() {
        @Override
        public void call(final Subscriber<? super Realm> subscriber) {
            final Realm observableRealm = Realm.getDefaultInstance();
            final RealmChangeListener<Realm> listener = realm -> {
                if(!subscriber.isUnsubscribed()) {
                    subscriber.onNext(observableRealm);
                }
            };
            observableRealm.addChangeListener(listener);
            subscriber.add(Subscriptions.create(() -> {
                observableRealm.removeChangeListener(listener);
                observableRealm.close();
            }));
            subscriber.onNext(observableRealm);
        }
    }).subscribeOn(looperScheduler.getScheduler()).unsubscribeOn(looperScheduler.getScheduler()).subscribe();
}
TrampolineScheduler.java 文件源码 项目:boohee_v5.6 阅读 21 收藏 0 点赞 0 评论 0
private Subscription enqueue(Action0 action, long execTime) {
    if (this.innerSubscription.isUnsubscribed()) {
        return Subscriptions.unsubscribed();
    }
    final TimedAction timedAction = new TimedAction(action, Long.valueOf(execTime), this.counter.incrementAndGet());
    this.queue.add(timedAction);
    if (this.wip.getAndIncrement() != 0) {
        return Subscriptions.create(new Action0() {
            public void call() {
                InnerCurrentThreadScheduler.this.queue.remove(timedAction);
            }
        });
    }
    do {
        TimedAction polled = (TimedAction) this.queue.poll();
        if (polled != null) {
            polled.action.call();
        }
    } while (this.wip.decrementAndGet() > 0);
    return Subscriptions.unsubscribed();
}
ExecutorScheduler.java 文件源码 项目:boohee_v5.6 阅读 22 收藏 0 点赞 0 评论 0
public Subscription schedule(Action0 action) {
    if (isUnsubscribed()) {
        return Subscriptions.unsubscribed();
    }
    Subscription ea = new ScheduledAction(action, this.tasks);
    this.tasks.add(ea);
    this.queue.offer(ea);
    if (this.wip.getAndIncrement() != 0) {
        return ea;
    }
    try {
        this.executor.execute(this);
        return ea;
    } catch (RejectedExecutionException t) {
        this.tasks.remove(ea);
        this.wip.decrementAndGet();
        RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
        throw t;
    }
}
OperatorTimeoutWithSelector.java 文件源码 项目:boohee_v5.6 阅读 18 收藏 0 点赞 0 评论 0
public Subscription call(final TimeoutSubscriber<T> timeoutSubscriber, final Long seqId, Worker inner) {
    if (this.val$firstTimeoutSelector == null) {
        return Subscriptions.unsubscribed();
    }
    try {
        return ((Observable) this.val$firstTimeoutSelector.call()).unsafeSubscribe(new Subscriber<U>() {
            public void onCompleted() {
                timeoutSubscriber.onTimeout(seqId.longValue());
            }

            public void onError(Throwable e) {
                timeoutSubscriber.onError(e);
            }

            public void onNext(U u) {
                timeoutSubscriber.onTimeout(seqId.longValue());
            }
        });
    } catch (Throwable t) {
        Exceptions.throwOrReport(t, timeoutSubscriber);
        return Subscriptions.unsubscribed();
    }
}
OperatorTimeoutWithSelector.java 文件源码 项目:boohee_v5.6 阅读 13 收藏 0 点赞 0 评论 0
public Subscription call(final TimeoutSubscriber<T> timeoutSubscriber, final Long seqId, T value, Worker inner) {
    try {
        return ((Observable) this.val$timeoutSelector.call(value)).unsafeSubscribe(new Subscriber<V>() {
            public void onCompleted() {
                timeoutSubscriber.onTimeout(seqId.longValue());
            }

            public void onError(Throwable e) {
                timeoutSubscriber.onError(e);
            }

            public void onNext(V v) {
                timeoutSubscriber.onTimeout(seqId.longValue());
            }
        });
    } catch (Throwable t) {
        Exceptions.throwOrReport(t, timeoutSubscriber);
        return Subscriptions.unsubscribed();
    }
}
OnSubscribeToObservableFuture.java 文件源码 项目:boohee_v5.6 阅读 17 收藏 0 点赞 0 评论 0
public void call(Subscriber<? super T> subscriber) {
    subscriber.add(Subscriptions.create(new Action0() {
        public void call() {
            ToObservableFuture.this.that.cancel(true);
        }
    }));
    try {
        if (!subscriber.isUnsubscribed()) {
            subscriber.onNext(this.unit == null ? this.that.get() : this.that.get(this.time, this.unit));
            subscriber.onCompleted();
        }
    } catch (Throwable e) {
        if (!subscriber.isUnsubscribed()) {
            Exceptions.throwOrReport(e, subscriber);
        }
    }
}
OnSubscribeRefCount.java 文件源码 项目:boohee_v5.6 阅读 20 收藏 0 点赞 0 评论 0
private Subscription disconnect(final CompositeSubscription current) {
    return Subscriptions.create(new Action0() {
        public void call() {
            OnSubscribeRefCount.this.lock.lock();
            try {
                if (OnSubscribeRefCount.this.baseSubscription == current && OnSubscribeRefCount.this.subscriptionCount.decrementAndGet() == 0) {
                    OnSubscribeRefCount.this.baseSubscription.unsubscribe();
                    OnSubscribeRefCount.this.baseSubscription = new CompositeSubscription();
                }
                OnSubscribeRefCount.this.lock.unlock();
            } catch (Throwable th) {
                OnSubscribeRefCount.this.lock.unlock();
            }
        }
    });
}
OperatorWindowWithSize.java 文件源码 项目:boohee_v5.6 阅读 22 收藏 0 点赞 0 评论 0
void init() {
    this.child.add(Subscriptions.create(new Action0() {
        public void call() {
            if (ExactSubscriber.this.noWindow) {
                ExactSubscriber.this.unsubscribe();
            }
        }
    }));
    this.child.setProducer(new Producer() {
        public void request(long n) {
            if (n > 0) {
                long u = n * ((long) OperatorWindowWithSize.this.size);
                if (!((u >>> 31) == 0 || u / n == ((long) OperatorWindowWithSize.this.size))) {
                    u = Long.MAX_VALUE;
                }
                ExactSubscriber.this.requestMore(u);
            }
        }
    });
}
OperatorWindowWithSize.java 文件源码 项目:boohee_v5.6 阅读 20 收藏 0 点赞 0 评论 0
void init() {
    this.child.add(Subscriptions.create(new Action0() {
        public void call() {
            if (InexactSubscriber.this.noWindow) {
                InexactSubscriber.this.unsubscribe();
            }
        }
    }));
    this.child.setProducer(new Producer() {
        public void request(long n) {
            if (n > 0) {
                long u = n * ((long) OperatorWindowWithSize.this.size);
                if (!((u >>> 31) == 0 || u / n == ((long) OperatorWindowWithSize.this.size))) {
                    u = Long.MAX_VALUE;
                }
                InexactSubscriber.this.requestMore(u);
            }
        }
    });
}
HotObservable.java 文件源码 项目:MarbleTest4J 阅读 13 收藏 0 点赞 0 评论 0
public void call(final Subscriber<? super T> subscriber) {
    final SubscriptionLog subscriptionLog = new SubscriptionLog(scheduler.now());
    observable.subscriptions.add(subscriptionLog);
    final int subscriptionIndex = observable.getSubscriptions().size() - 1;

    subscribers.add(subscriber);

    subscriber.add((Subscriptions.create(new Action0() {
        @Override
        public void call() {
            // on unsubscribe
            observable.subscriptions.set(
                    subscriptionIndex,
                    new SubscriptionLog(subscriptionLog.subscribe, scheduler.now())
            );
            subscribers.remove(subscriber);
        }
    })));
}
HandlerScheduler.java 文件源码 项目:Gank-Meizi 阅读 20 收藏 0 点赞 0 评论 0
@Override public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
  if (compositeSubscription.isUnsubscribed()) {
    return Subscriptions.unsubscribed();
  }

  action = RxAndroidPlugins.getInstance().getSchedulersHook().onSchedule(action);

  final ScheduledAction scheduledAction = new ScheduledAction(action);
  scheduledAction.addParent(compositeSubscription);
  compositeSubscription.add(scheduledAction);

  handler.postDelayed(scheduledAction, unit.toMillis(delayTime));

  scheduledAction.add(Subscriptions.create(new Action0() {
    @Override public void call() {
      //Log.e(TAG, "HandlerScheduler has unsubscribed");
      handler.removeCallbacks(scheduledAction);
    }
  }));

  return scheduledAction;
}
OnSubscribeBroadcastRegister.java 文件源码 项目:Rx.ContentObservable 阅读 13 收藏 0 点赞 0 评论 0
@Override
public void call(final Subscriber<? super Intent> subscriber)
{
    final BroadcastReceiver broadcastReceiver = new BroadcastReceiver()
    {
        @Override
        public void onReceive(Context context, Intent intent)
        {
            subscriber.onNext(intent);
        }
    };

    final Subscription subscription = Subscriptions.create(new Action0()
    {
        @Override
        public void call()
        {
            mContext.unregisterReceiver(broadcastReceiver);
        }
    });

    subscriber.add(subscription);
    mContext.registerReceiver(broadcastReceiver, mIntentFilter, mBroadcastPermission, mSchedulerHandler);
}
OnSubscribeSharedPreferenceChange.java 文件源码 项目:Rx.ContentObservable 阅读 13 收藏 0 点赞 0 评论 0
@Override
public void call(final Subscriber<? super String> subscriber)
{
    final SharedPreferences.OnSharedPreferenceChangeListener listener = new SharedPreferences.OnSharedPreferenceChangeListener()
    {
        @Override
        public void onSharedPreferenceChanged(SharedPreferences sharedPreferences, String key)
        {
            subscriber.onNext(key);
        }
    };

    subscriber.add(Subscriptions.create(new Action0()
    {
        @Override
        public void call()
        {
            mSharedPreferences.unregisterOnSharedPreferenceChangeListener(listener);
        }
    }));

    mSharedPreferences.registerOnSharedPreferenceChangeListener(listener);
}
OnSubscribeLocalBroadcastRegister.java 文件源码 项目:Rx.ContentObservable 阅读 13 收藏 0 点赞 0 评论 0
@Override
public void call(final Subscriber<? super Intent> subscriber)
{
    final LocalBroadcastManager localBroadcastManager = LocalBroadcastManager.getInstance(mContext);
    final BroadcastReceiver broadcastReceiver = new BroadcastReceiver()
    {
        @Override
        public void onReceive(Context context, Intent intent)
        {
            subscriber.onNext(intent);
        }
    };

    final Subscription subscription = Subscriptions.create(new Action0()
    {
        @Override
        public void call()
        {
            localBroadcastManager.unregisterReceiver(broadcastReceiver);
        }
    });

    subscriber.add(subscription);
    localBroadcastManager.registerReceiver(broadcastReceiver, mIntentFilter);
}
FirebaseEntityStore.java 文件源码 项目:buddysearch 阅读 16 收藏 0 点赞 0 评论 0
private <T> Observable<T> getQuery(Query query, Action2<Subscriber<? super T>, DataSnapshot> onNextAction, boolean subscribeForSingleEvent) {
    return Observable.create(subscriber -> {
        ValueEventListener eventListener = new ValueEventListener() {

            @Override
            public void onDataChange(DataSnapshot dataSnapshot) {
                onNextAction.call(subscriber, dataSnapshot);
            }

            @Override
            public void onCancelled(DatabaseError databaseError) {
                subscriber.onError(new FirebaseException(databaseError.getMessage()));
            }
        };
        if (subscribeForSingleEvent) {
            query.addListenerForSingleValueEvent(eventListener);
        } else {
            query.addValueEventListener(eventListener);
        }
        subscriber.add(Subscriptions.create(() -> query.removeEventListener(eventListener)));
    });
}
PlenProgramAdapter.java 文件源码 项目:plen-Scenography_Android 阅读 16 收藏 0 点赞 0 评论 0
private void bindView(int position, @NonNull PlenCodeUnitView view) {
    mSubscriptionMap.remove(view);
    mSubscriptionMap.remove(position);
    PlenCodeUnit unit = getItem(position);
    if (isBlankRow(unit)) {
        view.bindBlankRow();
        return;
    }

    Subscription subscription = Subscriptions.from(
            view.bind(mMotionMap.asObservable().map(m -> m.get(unit.getMotionId())),
                    Observable.just(unit.getLoopCount())),
            view.programUnit().asObservable()
                    .skipWhile(u -> !Objects.equals(u, unit))
                    .subscribe(u -> setUnit(position, u)));
    mSubscriptionMap.put(view, subscription);
    mSubscriptionMap.put(position, subscription);
}
PlenMotionListPagerAdapter.java 文件源码 项目:plen-Scenography_Android 阅读 17 收藏 0 点赞 0 评论 0
@NonNull
@Override
public Object instantiateItem(@NonNull ViewGroup container, int position) {
    PlenMotionListView view = (PlenMotionListView) mLayoutInflater
            .inflate(R.layout.page_plen_motion_list_pager, null);

    PlenMotionListAdapter adapter = PlenMotionListAdapter_.getInstance_(mContext);
    adapter.setDraggable(mDraggable);
    view.setAdapter(adapter);

    mSubscriptionMap.put(position, Subscriptions.from(
            adapter,
            adapter.bind(Observable.just(mItems.get(position).getMotions()))));

    container.addView(view);
    return view;
}
HandlerThreadScheduler.java 文件源码 项目:rxmoe 阅读 24 收藏 0 点赞 0 评论 0
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
    if (innerSubscription.isUnsubscribed()) {
        return Subscriptions.empty();
    }

    final ScheduledAction scheduledAction = new ScheduledAction(action, operationQueue);
    final ScheduledExecutorService executor = IOSScheduledExecutorPool.getInstance();

    Future<?> future;
    if (delayTime <= 0) {
        future = executor.submit(scheduledAction);
    } else {
        future = executor.schedule(scheduledAction, delayTime, unit);
    }

    scheduledAction.add(Subscriptions.from(future));
    scheduledAction.addParent(innerSubscription);

    return scheduledAction;
}
ViewMembers.java 文件源码 项目:DataSyncDemo 阅读 15 收藏 0 点赞 0 评论 0
@Override
protected void onResume() {
    super.onResume();
    onPauseSubs = Subscriptions.from();
    // refresh if needed
    boolean needsRefreshing = !members.hasTotal();
    long updatesFromTime;
    if (needsRefreshing) {
        refresh(true);
        updatesFromTime = -1;  // if refreshing anyway, no need to observe past events
    } else {
        updatesFromTime = Math.max(pausedElapsedRealtime, savedElapsedRealtime);
    }

    // start listening to event broadcasts.
    onPauseSubs.add(memberUpdates.observable(updatesFromTime)
            // debounce so we don't show snackbar for each update emitted in succession
            .debounce(100, TimeUnit.MILLISECONDS)
            .observeOn(uiScheduler)
            .subscribe(update -> showRefreshBar()));
}
MockActiveTappiesImpl.java 文件源码 项目:TappyBLE 阅读 13 收藏 0 点赞 0 评论 0
public MockActiveTappiesImpl(Scheduler ioScheduler) {
    this.ioScheduler = ioScheduler;
    tappyObs = Observable.create(new Observable.OnSubscribe<Set<TappyBleDeviceDefinition>>() {

        @Override
        public void call(final Subscriber<? super Set<TappyBleDeviceDefinition>> subscriber) {
            final ActiveTappiesListener listener = new ActiveTappiesListener() {
                @Override
                public void onUpdatedList(Set<TappyBleDeviceDefinition> tappies) {
                    subscriber.onNext(tappies);
                }
            };

            MockActiveTappiesImpl.this.registerListener(listener);

            subscriber.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    MockActiveTappiesImpl.this.unregisterListener(listener);
                }
            }));

            subscriber.onNext(tappyList);
        }
    }).subscribeOn(ioScheduler);
}
TappyStatusServiceImpl.java 文件源码 项目:TappyBLE 阅读 13 收藏 0 点赞 0 评论 0
public TappyStatusServiceImpl(Scheduler ioScheduler) {
    this.ioScheduler = ioScheduler;
    tappyStatusObservable = Observable.create(new Observable.OnSubscribe<Map<String,Integer>>() {

        @Override
        public void call(final Subscriber<? super Map<String,Integer>> subscriber) {
            final TappyStatusListener listener = new TappyStatusListener() {
                public void onUpdatedTappyStatusMap(Map<String,Integer> statusMap) {
                    subscriber.onNext(statusMap);
                }
            };

            TappyStatusServiceImpl.this.registerListener(listener);

            subscriber.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    TappyStatusServiceImpl.this.unregisterListener(listener);
                }
            }));

            subscriber.onNext(getCurrentStatusMapSync());
        }
    }).subscribeOn(ioScheduler);
}
RxFirebaseDatabase.java 文件源码 项目:Attendance 阅读 30 收藏 0 点赞 0 评论 0
/**
 * This methods observes a firebase query and returns back
 * an Observable of the {@link DataSnapshot}
 * when the firebase client uses a {@link ValueEventListener}
 *
 * @param firebaseRef {@link Query} this is reference of a Firebase Query
 * @return an {@link rx.Observable} of datasnapshot to use
 */
public Observable<DataSnapshot> observeValueEvent(final Query firebaseRef) {
  return Observable.create(new Observable.OnSubscribe<DataSnapshot>() {
    @Override public void call(final Subscriber<? super DataSnapshot> subscriber) {
      final ValueEventListener listener =
          firebaseRef.addValueEventListener(new ValueEventListener() {
            @Override public void onDataChange(DataSnapshot dataSnapshot) {
              subscriber.onNext(dataSnapshot);
            }

            @Override public void onCancelled(DatabaseError error) {
              FirebaseDatabaseErrorFactory.buildError(subscriber, error);
            }
          });

      // When the subscription is cancelled, remove the listener
      subscriber.add(Subscriptions.create(new Action0() {
        @Override public void call() {
          firebaseRef.removeEventListener(listener);
        }
      }));
    }
  }).compose(this.<DataSnapshot>applyScheduler());
}
RxFirebaseDatabase.java 文件源码 项目:Attendance 阅读 22 收藏 0 点赞 0 评论 0
/**
 * This methods observes a firebase query and returns back delayed
 * Observable of the {@link DataSnapshot}
 * when the firebase client uses a {@link ValueEventListener}
 *
 * @param firebaseRef {@link Query} this is reference of a Firebase Query
 * @return an {@link rx.Observable} of datasnapshot to use
 */
public Observable<DataSnapshot> observeValueEventDelayed(final Query firebaseRef) {
  return Observable.create(new Observable.OnSubscribe<DataSnapshot>() {
    @Override public void call(final Subscriber<? super DataSnapshot> subscriber) {
      final ValueEventListener listener =
              firebaseRef.addValueEventListener(new ValueEventListener() {
                @Override public void onDataChange(DataSnapshot dataSnapshot) {
                  subscriber.onNext(dataSnapshot);
                }

                @Override public void onCancelled(DatabaseError error) {
                  FirebaseDatabaseErrorFactory.buildError(subscriber, error);
                }
              });

      // When the subscription is cancelled, remove the listener
      subscriber.add(Subscriptions.create(new Action0() {
        @Override public void call() {
          firebaseRef.removeEventListener(listener);
        }
      }));
    }
  }).delay(500, TimeUnit.MILLISECONDS).observeOn(AndroidSchedulers.mainThread()).compose(this.<DataSnapshot>applyScheduler());
}


问题


面经


文章

微信
公众号

扫码关注公众号