@Override
public void putEntries(Observable<Entry> entries) {
final String insert =
"INSERT OR REPLACE INTO TILES(zoom_level, tile_column, tile_row, tile_data)"
+ " values (?, ?, ?, ?);";
final Observable<Object> params = entries.concatMap(entry -> {
byte[] compressedMvt;
try {
compressedMvt = CompressUtil.getCompressedAsGzip(entry.getVector());
} catch (final IOException ex) {
throw Exceptions.propagate(ex);
}
return Observable.<Object>just(entry.getZoomLevel(), entry.getColumn(),
flipY(entry.getRow(), entry.getZoomLevel()), compressedMvt);
})
// source: https://github.com/davidmoten/rxjava-jdbc/pull/46/files
.toList()
.flattenAsObservable(objects -> objects);
// TODO update when upstream is enhanced
dataSource.update(insert)
.parameterStream(params.toFlowable(BackpressureStrategy.BUFFER))
.counts()
.test() // TODO remove hack
.awaitDone(5, TimeUnit.SECONDS)
.assertComplete();
}
StorageImpl.java 文件源码
java
阅读 37
收藏 0
点赞 0
评论 0
项目:vt-support
作者:
评论列表
文章目录