private Func1<Chat, Observable<Users>> getUsers() {
return new Func1<Chat, Observable<Users>>() {
@Override
public Observable<Users> call(Chat chat) {
firstKey = chat.getFirstKey();
List<Observable<User>> list = new ArrayList<>();
for (Message m : chat.getMessages())
list.add(userService.getUser(m.getUid()));
return Observable.zip(list, new FuncN<Users>() {
@Override
public Users call(Object... args) {
ArrayList<User> users = new ArrayList<>();
for (Object o: args) users.add((User)o);
return new Users(users);
}
});
}
};
}
java类rx.functions.FuncN的实例源码
GlobalPresenter.java 文件源码
项目:Firebase-Chat-Demo
阅读 17
收藏 0
点赞 0
评论 0
MainListWithExample_Observable_withLatestFrom.java 文件源码
项目:RxJavaDemo
阅读 20
收藏 0
点赞 0
评论 0
private Observable example3() {
return Observable.interval(500, TimeUnit.MILLISECONDS).take(10)
.withLatestFrom(
mObservableList,
new FuncN<String>() {
@Override
public String call(Object... args) {
String s = "[";
for (Object object : args) {
s += object + ",";
}
s += "]";
return s;
}
});
}
MainListWithExample_Observable_withLatestFrom.java 文件源码
项目:RxJavaDemo
阅读 23
收藏 0
点赞 0
评论 0
private Observable example4() {
return Observable.interval(500, TimeUnit.MILLISECONDS).take(10)
.withLatestFrom(
mObservables,
new FuncN<String>() {
@Override
public String call(Object... args) {
String s = "[";
for (Object object : args) {
s += object + ",";
}
s += "]";
return s;
}
});
}
TaskWithRx.java 文件源码
项目:trabajando-en-diferido
阅读 21
收藏 0
点赞 0
评论 0
@Override public void executeTask(final Ui ui, int totalTask) {
List<Observable<ApiResponse>> calls = new ArrayList<>();
for (int i = 0; i < totalTask; i++) {
Observable<ApiResponse> apiResponseObservable = apiCall.callObservable(i + 1);
Observable<ApiResponse> observableOnNewThread =
apiResponseObservable.subscribeOn(Schedulers.newThread());
calls.add(observableOnNewThread);
}
Observable.zip(calls, new FuncN<Long>() {
@Override public Long call(Object... args) {
return System.currentTimeMillis();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override public void call(Long time) {
ui.showTime(time);
}
}, new Action1<Throwable>() {
@Override public void call(Throwable throwable) {
ui.showError("error " + throwable);
}
});
}
ZipTests.java 文件源码
项目:RxJavaFlow
阅读 23
收藏 0
点赞 0
评论 0
/**
* Occasionally zip may be invoked with 0 observables. Test that we don't block indefinitely instead
* of immediately invoking zip with 0 argument.
*
* We now expect an NoSuchElementException since last() requires at least one value and nothing will be emitted.
*/
@Test(expected = NoSuchElementException.class)
public void nonBlockingObservable() {
final Object invoked = new Object();
Collection<Observable<Object>> observables = Collections.emptyList();
Observable<Object> result = Observable.zip(observables, new FuncN<Object>() {
@Override
public Object call(final Object... args) {
System.out.println("received: " + args);
assertEquals("No argument should have been passed", 0, args.length);
return invoked;
}
});
assertSame(invoked, result.toBlocking().last());
}
OnSubscribeCombineLatestTest.java 文件源码
项目:RxJavaFlow
阅读 29
收藏 0
点赞 0
评论 0
@Test
public void testZeroSources() {
Observable<Object> result = Observable.combineLatest(Collections.<Observable<Object>> emptyList(), new FuncN<Object>() {
@Override
public Object call(Object... args) {
return args;
}
});
@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
result.subscribe(o);
verify(o).onComplete();
verify(o, never()).onNext(any());
verify(o, never()).onError(any(Throwable.class));
}
OperatorZipTest.java 文件源码
项目:RxJavaFlow
阅读 22
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
@Test
public void testCollectionSizeDifferentThanFunction() {
FuncN<String> zipr = Functions.fromFunc(getConcatStringIntegerIntArrayZipr());
//Func3<String, Integer, int[], String>
/* define a Observer to receive aggregated events */
Observer<String> observer = mock(Observer.class);
@SuppressWarnings("rawtypes")
Collection ws = java.util.Collections.singleton(Observable.just("one", "two"));
Observable<String> w = Observable.zip(ws, zipr);
w.subscribe(observer);
verify(observer, times(1)).onError(any(Throwable.class));
verify(observer, never()).onComplete();
verify(observer, never()).onNext(any(String.class));
}
OperatorZipTest.java 文件源码
项目:RxJavaFlow
阅读 22
收藏 0
点赞 0
评论 0
@Test
public void testStartEmptyList() {
final Object invoked = new Object();
Collection<Observable<Object>> observables = Collections.emptyList();
Observable<Object> o = Observable.zip(observables, new FuncN<Object>() {
@Override
public Object call(final Object... args) {
assertEquals("No argument should have been passed", 0, args.length);
return invoked;
}
});
TestSubscriber<Object> ts = new TestSubscriber<Object>();
o.subscribe(ts);
ts.awaitTerminalEvent(200, TimeUnit.MILLISECONDS);
ts.assertReceivedOnNext(Collections.emptyList());
}
OperatorZipTest.java 文件源码
项目:RxJavaFlow
阅读 22
收藏 0
点赞 0
评论 0
/**
* Expect NoSuchElementException instead of blocking forever as zip should emit onComplete() and no onNext
* and last() expects at least a single response.
*/
@Test(expected = NoSuchElementException.class)
public void testStartEmptyListBlocking() {
final Object invoked = new Object();
Collection<Observable<Object>> observables = Collections.emptyList();
Observable<Object> o = Observable.zip(observables, new FuncN<Object>() {
@Override
public Object call(final Object... args) {
assertEquals("No argument should have been passed", 0, args.length);
return invoked;
}
});
o.toBlocking().last();
}
RxUtil.java 文件源码
项目:ocelli
阅读 23
收藏 0
点赞 0
评论 0
/**
* Given a list of observables that emit a boolean condition AND all conditions whenever
* any condition changes and emit the resulting condition when the final condition changes.
* @param sources
* @return
*/
public static Observable<Boolean> conditionAnder(List<Observable<Boolean>> sources) {
return Observable.combineLatest(sources, new FuncN<Observable<Boolean>>() {
@Override
public Observable<Boolean> call(Object... args) {
return Observable.from(args).cast(Boolean.class).firstOrDefault(true, new Func1<Boolean, Boolean>() {
@Override
public Boolean call(Boolean status) {
return !status;
}
});
}
})
.flatMap(new Func1<Observable<Boolean>, Observable<Boolean>>() {
@Override
public Observable<Boolean> call(Observable<Boolean> t1) {
return t1;
}
})
.distinctUntilChanged();
}
ZipTests.java 文件源码
项目:lakeside-java
阅读 22
收藏 0
点赞 0
评论 0
/**
* Occasionally zip may be invoked with 0 observables. Test that we don't block indefinitely instead
* of immediately invoking zip with 0 argument.
*
* We now expect an IllegalArgumentException since last() requires at least one value and nothing will be emitted.
*/
@Test(expected = IllegalArgumentException.class)
public void nonBlockingObservable() {
final Object invoked = new Object();
Collection<Observable<Object>> observables = Collections.emptyList();
Observable<Object> result = Observable.zip(observables, new FuncN<Object>() {
@Override
public Object call(final Object... args) {
System.out.println("received: " + args);
assertEquals("No argument should have been passed", 0, args.length);
return invoked;
}
});
assertSame(invoked, result.toBlockingObservable().last());
}
Observable.java 文件源码
项目:letv
阅读 25
收藏 0
点赞 0
评论 0
public static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
List<Observable<?>> os = new ArrayList();
for (Observable<?> o : ws) {
os.add(o);
}
return just(os.toArray(new Observable[os.size()])).lift(new OperatorZip(zipFunction));
}
OnSubscribeCombineLatest.java 文件源码
项目:boohee_v5.6
阅读 23
收藏 0
点赞 0
评论 0
public MultiSourceProducer(Subscriber<? super R> child, List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) {
this.sources = sources;
this.child = child;
this.combinator = combinator;
int n = sources.size();
this.subscribers = new MultiSourceRequestableSubscriber[n];
this.collectedValues = new Object[n];
this.haveValues = new BitSet(n);
this.completion = new BitSet(n);
}
OnSubscribeCombineLatest.java 文件源码
项目:boohee_v5.6
阅读 25
收藏 0
点赞 0
评论 0
public OnSubscribeCombineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) {
this.sources = sources;
this.combinator = combinator;
if (sources.size() > RxRingBuffer.SIZE) {
throw new IllegalArgumentException("More than RxRingBuffer.SIZE sources to combineLatest is not supported.");
}
}
Observable.java 文件源码
项目:boohee_v5.6
阅读 21
收藏 0
点赞 0
评论 0
public static final <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
List<Observable<?>> os = new ArrayList();
for (Observable<?> o : ws) {
os.add(o);
}
return just(os.toArray(new Observable[os.size()])).lift(new OperatorZip(zipFunction));
}
MainListWithExample_Observable_zip.java 文件源码
项目:RxJavaDemo
阅读 20
收藏 0
点赞 0
评论 0
private Observable example1() {
return Observable.zip(mObservables, new FuncN<String>() {
@Override
public String call(Object... args) {
return getResult(args);
}
});
}
MainListWithExample_Observable_zip.java 文件源码
项目:RxJavaDemo
阅读 21
收藏 0
点赞 0
评论 0
private Observable example2() {
return Observable.zip(mObservableList, new FuncN<String>() {
@Override
public String call(Object... args) {
return getResult(args);
}
});
}
MainListWithExample_Observable_zip.java 文件源码
项目:RxJavaDemo
阅读 23
收藏 0
点赞 0
评论 0
private Observable example3() {
return Observable.zip(Observable.just(Observable.range(1, 10), Observable.range(15, 20)), new FuncN<String>() {
@Override
public String call(Object... args) {
return getResult(args);
}
});
}
OnSubscribeCombineLatest.java 文件源码
项目:RxJavaFlow
阅读 16
收藏 0
点赞 0
评论 0
public OnSubscribeCombineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) {
this.sources = sources;
this.combinator = combinator;
if (sources.size() > 128) {
// For design simplicity this is limited to 128. If more are really needed we'll need to adjust
// the design of how RxRingBuffer is used in the implementation below.
throw new IllegalArgumentException("More than 128 sources to combineLatest is not supported.");
}
}
OnSubscribeCombineLatest.java 文件源码
项目:RxJavaFlow
阅读 18
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
public MultiSourceProducer(final Subscriber<? super R> child, final List<? extends Observable<? extends T>> sources, FuncN<? extends R> combinator) {
this.sources = sources;
this.child = child;
this.combinator = combinator;
int n = sources.size();
this.subscribers = new MultiSourceRequestableSubscriber[n];
this.collectedValues = new Object[n];
this.haveValues = new BitSet(n);
this.completion = new BitSet(n);
}
OnSubscribeCombineLatestTest.java 文件源码
项目:RxJavaFlow
阅读 24
收藏 0
点赞 0
评论 0
@Test
public void test1ToNSources() {
int n = 30;
FuncN<List<Object>> func = new FuncN<List<Object>>() {
@Override
public List<Object> call(Object... args) {
return Arrays.asList(args);
}
};
for (int i = 1; i <= n; i++) {
System.out.println("test1ToNSources: " + i + " sources");
List<Observable<Integer>> sources = new ArrayList<Observable<Integer>>();
List<Object> values = new ArrayList<Object>();
for (int j = 0; j < i; j++) {
sources.add(Observable.just(j));
values.add(j);
}
Observable<List<Object>> result = Observable.combineLatest(sources, func);
@SuppressWarnings("unchecked")
Observer<List<Object>> o = mock(Observer.class);
result.subscribe(o);
verify(o).onNext(values);
verify(o).onComplete();
verify(o, never()).onError(any(Throwable.class));
}
}
OnSubscribeCombineLatestTest.java 文件源码
项目:RxJavaFlow
阅读 25
收藏 0
点赞 0
评论 0
@Test(timeout=10000)
public void testCombineLatestRequestOverflow() throws InterruptedException {
List<Observable<Integer>> sources = Arrays.asList(Observable.from(Arrays.asList(1,2,3,4)), Observable.from(Arrays.asList(5,6,7,8)));
Observable<Integer> o = Observable.combineLatest(sources,new FuncN<Integer>() {
@Override
public Integer call(Object... args) {
return (Integer) args[0];
}});
//should get at least 4
final CountDownLatch latch = new CountDownLatch(4);
o.subscribeOn(Schedulers.computation()).subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(2);
}
@Override
public void onComplete() {
//ignore
}
@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}
@Override
public void onNext(Integer t) {
latch.countDown();
request(Long.MAX_VALUE-1);
}});
assertTrue(latch.await(10, TimeUnit.SECONDS));
}
RxTextInputLayout.java 文件源码
项目:Rx_java2_soussidev
阅读 31
收藏 0
点赞 0
评论 0
/**
* @author Soussi
*
* @param button
* Checks for validity of the Validate Button
*/
public void RxValidateButton(final Button button)
{
Observable<CharSequence> signInFieldsSubscription = (Observable<CharSequence>) Observable.combineLatest((List<? extends Observable<?>>) customChangeObservable, new FuncN<Boolean>() {
@Override
public Boolean call(Object... args) {
for(int i = 0; i < args.length; i++){
if(!args[i].toString().isEmpty()) {
return false;
}
}
return true;
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe((Observer<? super Boolean>) new Observer<Boolean>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Boolean aBoolean) {
if(aBoolean)
{
button.setEnabled(true);
}
else
{
button.setEnabled(false);
}
}
});
compositeSubscription.add((Subscription) signInFieldsSubscription);
}
Observable.java 文件源码
项目:letv
阅读 22
收藏 0
点赞 0
评论 0
public static <T, R> Observable<R> combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) {
return create(new OnSubscribeCombineLatest(sources, combineFunction));
}
Observable.java 文件源码
项目:letv
阅读 27
收藏 0
点赞 0
评论 0
public static <T, R> Observable<R> combineLatest(Iterable<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) {
return create(new OnSubscribeCombineLatest(sources, combineFunction));
}
Observable.java 文件源码
项目:letv
阅读 22
收藏 0
点赞 0
评论 0
public static <T, R> Observable<R> combineLatestDelayError(Iterable<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) {
return create(new OnSubscribeCombineLatest(null, sources, combineFunction, RxRingBuffer.SIZE, true));
}
Observable.java 文件源码
项目:letv
阅读 23
收藏 0
点赞 0
评论 0
public static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction) {
return ws.toList().map(InternalObservableUtils.TO_ARRAY).lift(new OperatorZip(zipFunction));
}
VaultImpl.java 文件源码
项目:azure-libraries-for-java
阅读 28
收藏 0
点赞 0
评论 0
private Observable<List<AccessPolicy>> populateAccessPolicies() {
List<Observable<?>>observables = new ArrayList<>();
for (final AccessPolicyImpl accessPolicy : accessPolicies) {
if (accessPolicy.objectId() == null) {
if (accessPolicy.userPrincipalName() != null) {
observables.add(graphRbacManager.users().getByNameAsync(accessPolicy.userPrincipalName())
.subscribeOn(SdkContext.getRxScheduler())
.doOnNext(new Action1<ActiveDirectoryUser>() {
@Override
public void call(ActiveDirectoryUser user) {
if (user == null) {
throw new CloudException(String.format("User principal name %s is not found in tenant %s",
accessPolicy.userPrincipalName(), graphRbacManager.tenantId()), null);
}
accessPolicy.forObjectId(user.id());
}
}));
} else if (accessPolicy.servicePrincipalName() != null) {
observables.add(graphRbacManager.servicePrincipals().getByNameAsync(accessPolicy.servicePrincipalName())
.subscribeOn(SdkContext.getRxScheduler())
.doOnNext(new Action1<ServicePrincipal>() {
@Override
public void call(ServicePrincipal sp) {
if (sp == null) {
throw new CloudException(String.format("User principal name %s is not found in tenant %s",
accessPolicy.userPrincipalName(), graphRbacManager.tenantId()), null);
}
accessPolicy.forObjectId(sp.id());
}
}));
} else {
throw new IllegalArgumentException("Access policy must specify object ID.");
}
}
}
if (observables.isEmpty()) {
return Observable.just(accessPolicies());
} else {
return Observable.zip(observables, new FuncN<List<AccessPolicy>>() {
@Override
public List<AccessPolicy> call(Object... args) {
return accessPolicies();
}
});
}
}
OnSubscribeCombineLatest.java 文件源码
项目:boohee_v5.6
阅读 23
收藏 0
点赞 0
评论 0
public SingleSourceProducer(Subscriber<? super R> child, Observable<? extends T> source, FuncN<? extends R> combinator) {
this.source = source;
this.child = child;
this.combinator = combinator;
this.subscriber = new SingleSourceRequestableSubscriber(child, combinator);
}
OnSubscribeCombineLatest.java 文件源码
项目:boohee_v5.6
阅读 25
收藏 0
点赞 0
评论 0
SingleSourceRequestableSubscriber(Subscriber<? super R> child, FuncN<? extends R> combinator) {
super(child);
this.child = child;
this.combinator = combinator;
}