private void prepareCursor(int maxBatchSize, String offsetField, String lastSourceOffset) {
String stringOffset = "";
ObjectId objectIdOffset = null;
if (null == cursor) {
if (null == lastSourceOffset || lastSourceOffset.isEmpty()) {
objectIdOffset = initialObjectId;
stringOffset = initialId;
} else {
if (configBean.offsetType == OffsetFieldType.STRING)
stringOffset = lastSourceOffset;
else
objectIdOffset = new ObjectId(lastSourceOffset);
}
LOG.debug("Getting new cursor with params: {} {} {}",
maxBatchSize,
offsetField,
configBean.offsetType == OffsetFieldType.STRING ? stringOffset : objectIdOffset);
if (configBean.isCapped) {
cursor = mongoCollection
.find()
.filter(Filters.gt(
offsetField,
configBean.offsetType == OffsetFieldType.STRING ? stringOffset : objectIdOffset
))
.cursorType(CursorType.TailableAwait)
.batchSize(maxBatchSize)
.iterator();
} else {
cursor = mongoCollection
.find()
.filter(Filters.gt(
offsetField,
configBean.offsetType == OffsetFieldType.STRING ? stringOffset : objectIdOffset
))
.sort(Sorts.ascending(offsetField))
.cursorType(CursorType.NonTailable)
.batchSize(maxBatchSize)
.iterator();
}
}
}
MongoDBSource.java 文件源码
java
阅读 25
收藏 0
点赞 0
评论 0
项目:datacollector
作者:
评论列表
文章目录