/**
* Dynamically subscribes to all elements of the given observable set.
* When an element is added to the set, it is automatically subscribed to.
* When an element is removed from the set, it is automatically unsubscribed
* from.
* @param elems observable set of elements that will be subscribed to
* @param f function to subscribe to an element of the set.
* @return An aggregate subscription that tracks elementary subscriptions.
* When the returned subscription is unsubscribed, all elementary
* subscriptions are unsubscribed as well, and no new elementary
* subscriptions will be created.
*/
static <T> Subscription dynamic(
ObservableSet<T> elems,
Function<? super T, ? extends Subscription> f) {
Map<T, Subscription> elemSubs = new HashMap<>();
elems.forEach(t -> elemSubs.put(t, f.apply(t)));
Subscription setSub = EventStreams.changesOf(elems).subscribe(ch -> {
if(ch.wasRemoved()) {
Subscription sub = elemSubs.remove(ch.getElementRemoved());
assert sub != null;
sub.unsubscribe();
}
if(ch.wasAdded()) {
T elem = ch.getElementAdded();
assert !elemSubs.containsKey(elem);
elemSubs.put(elem, f.apply(elem));
}
});
return () -> {
setSub.unsubscribe();
elemSubs.forEach((t, sub) -> sub.unsubscribe());
};
}
Subscription.java 文件源码
java
阅读 37
收藏 0
点赞 0
评论 0
项目:ReactFX
作者:
评论列表
文章目录