@NonNull public static Observable<DataSnapshot> singleValue(final Query query) {
return Observable.create(new Observable.OnSubscribe<DataSnapshot>() {
@Override public void call(final Subscriber<? super DataSnapshot> subscriber) {
final ValueEventListener valueEvent = query.addValueEventListener(new ValueEventListener() {
@Override public void onDataChange(DataSnapshot dataSnapshot) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(dataSnapshot);
}
}
@Override public void onCancelled(DatabaseError databaseError) {
subscriber.onError(databaseError.toException());
}
});
subscriber.add(Subscriptions.create(new Action0() {
@Override public void call() {
query.removeEventListener(valueEvent);
}
}));
}
});
}
RxFirebase.java 文件源码
java
阅读 14
收藏 0
点赞 0
评论 0
项目:RxFBase
作者:
评论列表
文章目录