/**
* 接受消息
*
* @param tag 标志
* @param callBack 回调
*/
public <T> DisposableObserver registerNoThread(@NonNull final Object tag,
@NonNull final RxBusCallBack<T> callBack) {
RxBusEvent rxBusEvent = rxBusEventArrayMap.get(tag);
if (RxUtils.isEmpty(rxBusEvent)) {
rxBusEvent = new RxBusEvent();
rxBusEvent.subject = PublishSubject.create().toSerialized();
rxBusEvent.disposable =
rxBusEvent.subject
.ofType(callBack.busOfType())
.subscribeWith(new RxBusObserver<T>() {
@Override
public void onError(@io.reactivex.annotations.NonNull Throwable e) {
super.onError(e);
callBack.onBusError(e);
}
@Override
public void onNext(@io.reactivex.annotations.NonNull T t) {
super.onNext(t);
}
});
rxBusEventArrayMap.put(tag, rxBusEvent);
}
return rxBusEvent.disposable;
}
RxBus.java 文件源码
java
阅读 34
收藏 0
点赞 0
评论 0
项目:RxNetWork
作者:
评论列表
文章目录