java类rx.functions.FuncN的实例源码

GlobalPresenter.java 文件源码 项目:Firebase-Chat-Demo 阅读 17 收藏 0 点赞 0 评论 0
private Func1<Chat, Observable<Users>> getUsers() {
    return new Func1<Chat, Observable<Users>>() {
        @Override
        public Observable<Users> call(Chat chat) {
            firstKey = chat.getFirstKey();
            List<Observable<User>> list = new ArrayList<>();
            for (Message m : chat.getMessages())
                list.add(userService.getUser(m.getUid()));

            return Observable.zip(list, new FuncN<Users>() {
                @Override
                public Users call(Object... args) {
                    ArrayList<User> users = new ArrayList<>();
                    for (Object o: args) users.add((User)o);
                    return new Users(users);
                }
            });
        }
    };
}
MainListWithExample_Observable_withLatestFrom.java 文件源码 项目:RxJavaDemo 阅读 20 收藏 0 点赞 0 评论 0
private Observable example3() {
    return Observable.interval(500, TimeUnit.MILLISECONDS).take(10)
            .withLatestFrom(
                    mObservableList,
                    new FuncN<String>() {
                        @Override
                        public String call(Object... args) {
                            String s = "[";
                            for (Object object : args) {
                                s += object + ",";
                            }
                            s += "]";
                            return s;
                        }
                    });
}
MainListWithExample_Observable_withLatestFrom.java 文件源码 项目:RxJavaDemo 阅读 23 收藏 0 点赞 0 评论 0
private Observable example4() {
    return Observable.interval(500, TimeUnit.MILLISECONDS).take(10)
            .withLatestFrom(
                    mObservables,
                    new FuncN<String>() {
                        @Override
                        public String call(Object... args) {
                            String s = "[";
                            for (Object object : args) {
                                s += object + ",";
                            }
                            s += "]";
                            return s;
                        }
                    });
}
TaskWithRx.java 文件源码 项目:trabajando-en-diferido 阅读 21 收藏 0 点赞 0 评论 0
@Override public void executeTask(final Ui ui, int totalTask) {
  List<Observable<ApiResponse>> calls = new ArrayList<>();
  for (int i = 0; i < totalTask; i++) {
    Observable<ApiResponse> apiResponseObservable = apiCall.callObservable(i + 1);
    Observable<ApiResponse> observableOnNewThread =
        apiResponseObservable.subscribeOn(Schedulers.newThread());
    calls.add(observableOnNewThread);
  }

  Observable.zip(calls, new FuncN<Long>() {
    @Override public Long call(Object... args) {
      return System.currentTimeMillis();
    }
  })
      .subscribeOn(Schedulers.newThread())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Action1<Long>() {
        @Override public void call(Long time) {
          ui.showTime(time);
        }
      }, new Action1<Throwable>() {
        @Override public void call(Throwable throwable) {
          ui.showError("error " + throwable);
        }
      });
}
ZipTests.java 文件源码 项目:RxJavaFlow 阅读 23 收藏 0 点赞 0 评论 0
/**
 * Occasionally zip may be invoked with 0 observables. Test that we don't block indefinitely instead
 * of immediately invoking zip with 0 argument.
 * 
 * We now expect an NoSuchElementException since last() requires at least one value and nothing will be emitted.
 */
@Test(expected = NoSuchElementException.class)
public void nonBlockingObservable() {

    final Object invoked = new Object();

    Collection<Observable<Object>> observables = Collections.emptyList();

    Observable<Object> result = Observable.zip(observables, new FuncN<Object>() {
        @Override
        public Object call(final Object... args) {
            System.out.println("received: " + args);
            assertEquals("No argument should have been passed", 0, args.length);
            return invoked;
        }
    });

    assertSame(invoked, result.toBlocking().last());
}
OnSubscribeCombineLatestTest.java 文件源码 项目:RxJavaFlow 阅读 29 收藏 0 点赞 0 评论 0
@Test
public void testZeroSources() {
    Observable<Object> result = Observable.combineLatest(Collections.<Observable<Object>> emptyList(), new FuncN<Object>() {

        @Override
        public Object call(Object... args) {
            return args;
        }

    });

    @SuppressWarnings("unchecked")
    Observer<Object> o = mock(Observer.class);

    result.subscribe(o);

    verify(o).onComplete();
    verify(o, never()).onNext(any());
    verify(o, never()).onError(any(Throwable.class));

}
OperatorZipTest.java 文件源码 项目:RxJavaFlow 阅读 22 收藏 0 点赞 0 评论 0
@SuppressWarnings("unchecked")
@Test
public void testCollectionSizeDifferentThanFunction() {
    FuncN<String> zipr = Functions.fromFunc(getConcatStringIntegerIntArrayZipr());
    //Func3<String, Integer, int[], String>

    /* define a Observer to receive aggregated events */
    Observer<String> observer = mock(Observer.class);

    @SuppressWarnings("rawtypes")
    Collection ws = java.util.Collections.singleton(Observable.just("one", "two"));
    Observable<String> w = Observable.zip(ws, zipr);
    w.subscribe(observer);

    verify(observer, times(1)).onError(any(Throwable.class));
    verify(observer, never()).onComplete();
    verify(observer, never()).onNext(any(String.class));
}
OperatorZipTest.java 文件源码 项目:RxJavaFlow 阅读 22 收藏 0 点赞 0 评论 0
@Test
public void testStartEmptyList() {

    final Object invoked = new Object();
    Collection<Observable<Object>> observables = Collections.emptyList();

    Observable<Object> o = Observable.zip(observables, new FuncN<Object>() {
        @Override
        public Object call(final Object... args) {
            assertEquals("No argument should have been passed", 0, args.length);
            return invoked;
        }
    });

    TestSubscriber<Object> ts = new TestSubscriber<Object>();
    o.subscribe(ts);
    ts.awaitTerminalEvent(200, TimeUnit.MILLISECONDS);
    ts.assertReceivedOnNext(Collections.emptyList());
}
OperatorZipTest.java 文件源码 项目:RxJavaFlow 阅读 22 收藏 0 点赞 0 评论 0
/**
 * Expect NoSuchElementException instead of blocking forever as zip should emit onComplete() and no onNext
 * and last() expects at least a single response.
 */
@Test(expected = NoSuchElementException.class)
public void testStartEmptyListBlocking() {

    final Object invoked = new Object();
    Collection<Observable<Object>> observables = Collections.emptyList();

    Observable<Object> o = Observable.zip(observables, new FuncN<Object>() {
        @Override
        public Object call(final Object... args) {
            assertEquals("No argument should have been passed", 0, args.length);
            return invoked;
        }
    });

    o.toBlocking().last();
}
RxUtil.java 文件源码 项目:ocelli 阅读 23 收藏 0 点赞 0 评论 0
/**
 * Given a list of observables that emit a boolean condition AND all conditions whenever
 * any condition changes and emit the resulting condition when the final condition changes.
 * @param sources
 * @return
 */
public static Observable<Boolean> conditionAnder(List<Observable<Boolean>> sources) {
    return Observable.combineLatest(sources, new FuncN<Observable<Boolean>>() {
        @Override
        public Observable<Boolean> call(Object... args) {
            return Observable.from(args).cast(Boolean.class).firstOrDefault(true, new Func1<Boolean, Boolean>() {
                @Override
                public Boolean call(Boolean status) {
                    return !status;
                }
            });
        }
    })
    .flatMap(new Func1<Observable<Boolean>, Observable<Boolean>>() {
        @Override
        public Observable<Boolean> call(Observable<Boolean> t1) {
            return t1;
        }
    })
    .distinctUntilChanged();
}
ZipTests.java 文件源码 项目:lakeside-java 阅读 22 收藏 0 点赞 0 评论 0
/**
 * Occasionally zip may be invoked with 0 observables. Test that we don't block indefinitely instead
 * of immediately invoking zip with 0 argument.
 * 
 * We now expect an IllegalArgumentException since last() requires at least one value and nothing will be emitted.
 */
@Test(expected = IllegalArgumentException.class)
public void nonBlockingObservable() {

    final Object invoked = new Object();

    Collection<Observable<Object>> observables = Collections.emptyList();

    Observable<Object> result = Observable.zip(observables, new FuncN<Object>() {
        @Override
        public Object call(final Object... args) {
            System.out.println("received: " + args);
            assertEquals("No argument should have been passed", 0, args.length);
            return invoked;
        }
    });

    assertSame(invoked, result.toBlockingObservable().last());
}
Observable.java 文件源码 项目:letv 阅读 25 收藏 0 点赞 0 评论 0
public static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
    List<Observable<?>> os = new ArrayList();
    for (Observable<?> o : ws) {
        os.add(o);
    }
    return just(os.toArray(new Observable[os.size()])).lift(new OperatorZip(zipFunction));
}
OnSubscribeCombineLatest.java 文件源码 项目:boohee_v5.6 阅读 23 收藏 0 点赞 0 评论 0
public MultiSourceProducer(Subscriber<? super R> child, List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) {
    this.sources = sources;
    this.child = child;
    this.combinator = combinator;
    int n = sources.size();
    this.subscribers = new MultiSourceRequestableSubscriber[n];
    this.collectedValues = new Object[n];
    this.haveValues = new BitSet(n);
    this.completion = new BitSet(n);
}
OnSubscribeCombineLatest.java 文件源码 项目:boohee_v5.6 阅读 25 收藏 0 点赞 0 评论 0
public OnSubscribeCombineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) {
    this.sources = sources;
    this.combinator = combinator;
    if (sources.size() > RxRingBuffer.SIZE) {
        throw new IllegalArgumentException("More than RxRingBuffer.SIZE sources to combineLatest is not supported.");
    }
}
Observable.java 文件源码 项目:boohee_v5.6 阅读 21 收藏 0 点赞 0 评论 0
public static final <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
    List<Observable<?>> os = new ArrayList();
    for (Observable<?> o : ws) {
        os.add(o);
    }
    return just(os.toArray(new Observable[os.size()])).lift(new OperatorZip(zipFunction));
}
MainListWithExample_Observable_zip.java 文件源码 项目:RxJavaDemo 阅读 20 收藏 0 点赞 0 评论 0
private Observable example1() {
    return Observable.zip(mObservables, new FuncN<String>() {
        @Override
        public String call(Object... args) {
            return getResult(args);
        }
    });
}
MainListWithExample_Observable_zip.java 文件源码 项目:RxJavaDemo 阅读 21 收藏 0 点赞 0 评论 0
private Observable example2() {
    return Observable.zip(mObservableList, new FuncN<String>() {
        @Override
        public String call(Object... args) {
            return getResult(args);
        }
    });
}
MainListWithExample_Observable_zip.java 文件源码 项目:RxJavaDemo 阅读 23 收藏 0 点赞 0 评论 0
private Observable example3() {
    return Observable.zip(Observable.just(Observable.range(1, 10), Observable.range(15, 20)), new FuncN<String>() {
        @Override
        public String call(Object... args) {
            return getResult(args);
        }
    });
}
OnSubscribeCombineLatest.java 文件源码 项目:RxJavaFlow 阅读 16 收藏 0 点赞 0 评论 0
public OnSubscribeCombineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) {
    this.sources = sources;
    this.combinator = combinator;
    if (sources.size() > 128) {
        // For design simplicity this is limited to 128. If more are really needed we'll need to adjust 
        // the design of how RxRingBuffer is used in the implementation below.
        throw new IllegalArgumentException("More than 128 sources to combineLatest is not supported.");
    }
}
OnSubscribeCombineLatest.java 文件源码 项目:RxJavaFlow 阅读 18 收藏 0 点赞 0 评论 0
@SuppressWarnings("unchecked")
public MultiSourceProducer(final Subscriber<? super R> child, final List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) {
    this.sources = sources;
    this.child = child;
    this.combinator = combinator;

    int n = sources.size();
    this.subscribers = new MultiSourceRequestableSubscriber[n];
    this.collectedValues = new Object[n];
    this.haveValues = new BitSet(n);
    this.completion = new BitSet(n);
}
OnSubscribeCombineLatestTest.java 文件源码 项目:RxJavaFlow 阅读 24 收藏 0 点赞 0 评论 0
@Test
public void test1ToNSources() {
    int n = 30;
    FuncN<List<Object>> func = new FuncN<List<Object>>() {

        @Override
        public List<Object> call(Object... args) {
            return Arrays.asList(args);
        }
    };
    for (int i = 1; i <= n; i++) {
        System.out.println("test1ToNSources: " + i + " sources");
        List<Observable<Integer>> sources = new ArrayList<Observable<Integer>>();
        List<Object> values = new ArrayList<Object>();
        for (int j = 0; j < i; j++) {
            sources.add(Observable.just(j));
            values.add(j);
        }

        Observable<List<Object>> result = Observable.combineLatest(sources, func);

        @SuppressWarnings("unchecked")
        Observer<List<Object>> o = mock(Observer.class);

        result.subscribe(o);

        verify(o).onNext(values);
        verify(o).onComplete();
        verify(o, never()).onError(any(Throwable.class));
    }
}
OnSubscribeCombineLatestTest.java 文件源码 项目:RxJavaFlow 阅读 25 收藏 0 点赞 0 评论 0
@Test(timeout=10000)
public void testCombineLatestRequestOverflow() throws InterruptedException {
    List<Observable<Integer>> sources = Arrays.asList(Observable.from(Arrays.asList(1,2,3,4)), Observable.from(Arrays.asList(5,6,7,8)));
    Observable<Integer> o = Observable.combineLatest(sources,new FuncN<Integer>() {
        @Override
        public Integer call(Object... args) {
           return (Integer) args[0];
        }});
    //should get at least 4
    final CountDownLatch latch = new CountDownLatch(4);
    o.subscribeOn(Schedulers.computation()).subscribe(new Subscriber<Integer>() {

        @Override
        public void onStart() {
            request(2);
        }

        @Override
        public void onComplete() {
            //ignore
        }

        @Override
        public void onError(Throwable e) {
            throw new RuntimeException(e);
        }

        @Override
        public void onNext(Integer t) {
            latch.countDown();
            request(Long.MAX_VALUE-1);
        }});
    assertTrue(latch.await(10, TimeUnit.SECONDS));
}
RxTextInputLayout.java 文件源码 项目:Rx_java2_soussidev 阅读 31 收藏 0 点赞 0 评论 0
/**
 * @author Soussi
 *
 * @param button
 * Checks for validity of the Validate Button
 */

public void RxValidateButton(final Button button)
{
    Observable<CharSequence> signInFieldsSubscription = (Observable<CharSequence>) Observable.combineLatest((List<? extends Observable<?>>) customChangeObservable, new FuncN<Boolean>() {
        @Override
        public Boolean call(Object... args) {
            for(int i = 0; i < args.length; i++){
                if(!args[i].toString().isEmpty()) {
                    return false;
                }
            }
            return true;


        }
    }).observeOn(AndroidSchedulers.mainThread())
            .subscribe((Observer<? super Boolean>) new Observer<Boolean>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                }

                @Override
                public void onNext(Boolean aBoolean) {
                    if(aBoolean)
                    {
                        button.setEnabled(true);
                    }
                    else
                    {
                        button.setEnabled(false);
                    }

                }


            });
    compositeSubscription.add((Subscription) signInFieldsSubscription);
}
Observable.java 文件源码 项目:letv 阅读 22 收藏 0 点赞 0 评论 0
public static <T, R> Observable<R> combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) {
    return create(new OnSubscribeCombineLatest(sources, combineFunction));
}
Observable.java 文件源码 项目:letv 阅读 27 收藏 0 点赞 0 评论 0
public static <T, R> Observable<R> combineLatest(Iterable<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) {
    return create(new OnSubscribeCombineLatest(sources, combineFunction));
}
Observable.java 文件源码 项目:letv 阅读 22 收藏 0 点赞 0 评论 0
public static <T, R> Observable<R> combineLatestDelayError(Iterable<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) {
    return create(new OnSubscribeCombineLatest(null, sources, combineFunction, RxRingBuffer.SIZE, true));
}
Observable.java 文件源码 项目:letv 阅读 23 收藏 0 点赞 0 评论 0
public static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
    return ws.toList().map(InternalObservableUtils.TO_ARRAY).lift(new OperatorZip(zipFunction));
}
VaultImpl.java 文件源码 项目:azure-libraries-for-java 阅读 28 收藏 0 点赞 0 评论 0
private Observable<List<AccessPolicy>> populateAccessPolicies() {
    List<Observable<?>>observables = new ArrayList<>();
    for (final AccessPolicyImpl accessPolicy : accessPolicies) {
        if (accessPolicy.objectId() == null) {
            if (accessPolicy.userPrincipalName() != null) {
                observables.add(graphRbacManager.users().getByNameAsync(accessPolicy.userPrincipalName())
                        .subscribeOn(SdkContext.getRxScheduler())
                        .doOnNext(new Action1<ActiveDirectoryUser>() {
                            @Override
                            public void call(ActiveDirectoryUser user) {
                                if (user == null) {
                                    throw new CloudException(String.format("User principal name %s is not found in tenant %s",
                                            accessPolicy.userPrincipalName(), graphRbacManager.tenantId()), null);
                                }
                                accessPolicy.forObjectId(user.id());
                            }
                        }));
            } else if (accessPolicy.servicePrincipalName() != null) {
                observables.add(graphRbacManager.servicePrincipals().getByNameAsync(accessPolicy.servicePrincipalName())
                        .subscribeOn(SdkContext.getRxScheduler())
                        .doOnNext(new Action1<ServicePrincipal>() {
                            @Override
                            public void call(ServicePrincipal sp) {
                                if (sp == null) {
                                    throw new CloudException(String.format("User principal name %s is not found in tenant %s",
                                            accessPolicy.userPrincipalName(), graphRbacManager.tenantId()), null);
                                }
                                accessPolicy.forObjectId(sp.id());
                            }
                        }));
            } else {
                throw new IllegalArgumentException("Access policy must specify object ID.");
            }
        }
    }
    if (observables.isEmpty()) {
        return Observable.just(accessPolicies());
    } else {
        return Observable.zip(observables, new FuncN<List<AccessPolicy>>() {
            @Override
            public List<AccessPolicy> call(Object... args) {
                return accessPolicies();
            }
        });
    }
}
OnSubscribeCombineLatest.java 文件源码 项目:boohee_v5.6 阅读 23 收藏 0 点赞 0 评论 0
public SingleSourceProducer(Subscriber<? super R> child, Observable<? extends T> source, FuncN<? extends R> combinator) {
    this.source = source;
    this.child = child;
    this.combinator = combinator;
    this.subscriber = new SingleSourceRequestableSubscriber(child, combinator);
}
OnSubscribeCombineLatest.java 文件源码 项目:boohee_v5.6 阅读 25 收藏 0 点赞 0 评论 0
SingleSourceRequestableSubscriber(Subscriber<? super R> child, FuncN<? extends R> combinator) {
    super(child);
    this.child = child;
    this.combinator = combinator;
}


问题


面经


文章

微信
公众号

扫码关注公众号