public static <T, Resource> Observable<T> sort(Observable<T> source,
final Comparator<T> comparator,
final Func2<Observable<T>, Resource, Observable<Resource>> writer,
final Func1<Resource, Observable<T>> reader, final Func0<Resource> resourceFactory,
final Action1<Resource> resourceDisposer, int maxToSortInMemoryPerThread,
final int maxTempResources, Scheduler scheduler) {
Preconditions.checkArgument(maxToSortInMemoryPerThread > 0,
"maxToSortInMemoryPerThread must be greater than 0");
Preconditions.checkArgument(maxTempResources >= 2, "maxTempResources must be at least 2");
return source
// buffer into groups small enough to sort in memory
.buffer(maxToSortInMemoryPerThread)
// sort each buffer to a resource
.flatMap(sortInMemoryAndWriteToAResource(comparator, writer, resourceFactory,
scheduler))
// reduce by merging groups of resources to a single resource
// once the resource count is maxTempResources
.lift(new OperatorResourceMerger<Resource, T>(comparator, writer, reader,
resourceFactory, resourceDisposer, maxTempResources))
// help out backpressure because ResourceMerger doesn't support
// yet
.onBackpressureBuffer()
// emit the contents of the last file in the reduction process
.flatMap(reader);
}
BigSort.java 文件源码
java
阅读 18
收藏 0
点赞 0
评论 0
项目:bigsort
作者:
评论列表
文章目录