public O2MSyncDataLoader getPendingDataLoader() {
O2MSyncDataLoader loader = null;
Document document = syncEventDoc.findOneAndUpdate(
Filters.and(Filters.eq(SyncAttrs.STATUS, SyncStatus.PENDING),
Filters.eq(SyncAttrs.EVENT_TYPE, String.valueOf(EventType.System))),
Updates.set(SyncAttrs.STATUS, SyncStatus.IN_PROGRESS),
new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER)
.projection(Projections.include(SyncAttrs.SOURCE_DB_NAME, SyncAttrs.SOURCE_USER_NAME)));
if (document != null && !document.isEmpty()) {
Object interval = document.get(SyncAttrs.INTERVAL);
String appName = document.getString(SyncAttrs.APPLICATION_NAME);
if(interval!=null && interval instanceof Long){
loader = new O2MSyncDataLoader((Long)interval, appName);
}else{
loader = new O2MSyncDataLoader(120000, appName);
}
loader.setEventId(document.getObjectId(SyncAttrs.ID));
loader.setDbName(document.getString(SyncAttrs.SOURCE_DB_NAME));
loader.setDbUserName(document.getString(SyncAttrs.SOURCE_USER_NAME));
loader.setStatus(document.getString(SyncAttrs.STATUS));
}
return loader;
}
java类com.mongodb.client.model.Projections的实例源码
SyncEventDao.java 文件源码
项目:mongodb-rdbms-sync
阅读 46
收藏 0
点赞 0
评论 0
ServerResource.java 文件源码
项目:sam
阅读 33
收藏 0
点赞 0
评论 0
@GET
@Path("services/server/{environment}/{hostname}/deployment/{applicationId}")
@ApiOperation(value = "Get a deployed application on the server", response = Deployment.class)
public Deployment getServerDeployment(
@ApiParam("Server hostname") @PathParam("hostname") String hostname,
@ApiParam("Environment") @PathParam("environment") String environment,
@ApiParam("Application id") @PathParam("applicationId") String applicationId
) {
final Document document = database.getCollection(Collections.SERVERS)
.find(Filters.and(
Filters.eq("environment", environment),
Filters.eq("hostname", hostname)
)).projection(Projections.elemMatch("deployments",
Filters.eq("applicationId", applicationId)
)).first();
if (document == null) {
throw new WebApplicationException(Status.NOT_FOUND);
}
final Deployment deployment = Mapper.mapObject(document, "deployments", Deployment::fromDeploymentBson);
if (deployment == null) {
throw new WebApplicationException(Status.NOT_FOUND);
}
return deployment;
}
VerseDaoImpl.java 文件源码
项目:verbum-domini
阅读 94
收藏 0
点赞 0
评论 0
@Override
public Pagination<VerseBean> list(VerseBean filter, Integer firstResult, Integer maxResults) {
MongoCollection<VerseBeanImpl> coll = this.persistenceManager.fetchCollection(VerseBeanImpl.class);
Pagination<VerseBean> pagination = new Pagination<VerseBean>();
pagination.setSize(this.countQueryResult(coll, this.createBasicFilter(filter)));
FindIterable<VerseBeanImpl> iterable = coll.find(this.createBasicFilter(filter))
.skip(firstResult - 1)
.limit(maxResults)
.projection(Projections.include("_id", "number", "chapter_id"));
Iterator<VerseBeanImpl> iterator = iterable.iterator();
List<VerseBean> verses = new ArrayList<>();
while (iterator.hasNext()) {
verses.add(iterator.next());
}
pagination.setElements(verses);
return pagination;
}
BibleDaoImpl.java 文件源码
项目:verbum-domini
阅读 24
收藏 0
点赞 0
评论 0
@Override
public Pagination<BibleBean> list(BibleBean filter, Integer firstResult, Integer maxResults) {
MongoCollection<BibleBeanImpl> coll = this.persistenceManager.fetchCollection(BibleBeanImpl.class);
Pagination<BibleBean> pagination = new Pagination<BibleBean>();
pagination.setSize(this.countQueryResult(coll, this.createBasicFilter(filter)));
FindIterable<BibleBeanImpl> iterable = coll.find(this.createBasicFilter(filter))
.skip(firstResult - 1)
.limit(maxResults)
.projection(Projections.include("_id", "name", "language"));
Iterator<BibleBeanImpl> iterator = iterable.iterator();
List<BibleBean> bibles = new ArrayList<>();
while (iterator.hasNext()) {
bibles.add(iterator.next());
}
pagination.setElements(bibles);
return pagination;
}
CharacterPojoTest.java 文件源码
项目:bsoncodec-apt
阅读 22
收藏 0
点赞 0
评论 0
@Test
public void testInsertAndFind() {
MongoDatabase db = connect();
CharacterPojo pojo = insert(db);
MongoCollection<CharacterPojo> coll = db.getCollection(COLL_NAME,
CharacterPojo.class);
CharacterPojo read = coll.find().first();
assertThat(read).isEqualToComparingFieldByField(pojo);
CharacterPojo empty = coll.find().projection(Projections.include("id")).first();
assertThat(empty.getScalarPrimitive()).isEqualTo((char) 0);
assertThat(empty.getScalar()).isNull();
assertThat(empty.getArray()).isNull();
assertThat(empty.getArrayPrimitive()).isNull();
assertThat(empty.getArray2()).isNull();
assertThat(empty.getArray2Primitive()).isNull();
assertThat(empty.getList()).isNull();
assertThat(empty.getSet()).isNull();
assertThat(empty.getMap()).isNull();
}
StringPojoTest.java 文件源码
项目:bsoncodec-apt
阅读 26
收藏 0
点赞 0
评论 0
@Test
public void testInsertAndFind() {
MongoDatabase db = connect();
StringPojo pojo = insert(db);
MongoCollection<StringPojo> coll = db.getCollection(COLL_NAME, StringPojo.class);
StringPojo read = coll.find().first();
assertThat(read).isEqualToComparingFieldByField(pojo);
StringPojo empty = coll.find().projection(Projections.include("id")).first();
assertThat(empty.getScalar()).isNull();
assertThat(empty.getArray()).isNull();
assertThat(empty.getArray2()).isNull();
assertThat(empty.getList()).isNull();
assertThat(empty.getSet()).isNull();
assertThat(empty.getMap()).isNull();
}
ShortPojoTest.java 文件源码
项目:bsoncodec-apt
阅读 24
收藏 0
点赞 0
评论 0
@Test
public void testInsertAndFind() {
MongoDatabase db = connect();
ShortPojo pojo = insert(db);
MongoCollection<ShortPojo> coll = db.getCollection(COLL_NAME, ShortPojo.class);
ShortPojo read = coll.find().first();
assertThat(read).isEqualToComparingFieldByField(pojo);
ShortPojo empty = coll.find().projection(Projections.include("id")).first();
assertThat(empty.getScalarPrimitive()).isEqualTo((short) 0);
assertThat(empty.getScalar()).isNull();
assertThat(empty.getArray()).isNull();
assertThat(empty.getArrayPrimitive()).isNull();
assertThat(empty.getArray2()).isNull();
assertThat(empty.getArray2Primitive()).isNull();
assertThat(empty.getList()).isNull();
assertThat(empty.getSet()).isNull();
assertThat(empty.getMap()).isNull();
}
LongPojoTest.java 文件源码
项目:bsoncodec-apt
阅读 30
收藏 0
点赞 0
评论 0
@Test
public void testInsertAndFind() {
MongoDatabase db = connect();
LongPojo pojo = insert(db);
MongoCollection<LongPojo> coll = db.getCollection(COLL_NAME, LongPojo.class);
LongPojo read = coll.find().first();
assertThat(read).isEqualToComparingFieldByField(pojo);
LongPojo empty = coll.find().projection(Projections.include("id")).first();
assertThat(empty.getScalarPrimitive()).isEqualTo(0L);
assertThat(empty.getScalar()).isNull();
assertThat(empty.getArray()).isNull();
assertThat(empty.getArrayPrimitive()).isNull();
assertThat(empty.getArray2()).isNull();
assertThat(empty.getArray2Primitive()).isNull();
assertThat(empty.getList()).isNull();
assertThat(empty.getSet()).isNull();
assertThat(empty.getMap()).isNull();
}
MonthPojoTest.java 文件源码
项目:bsoncodec-apt
阅读 29
收藏 0
点赞 0
评论 0
@Test
public void testInsertAndFind() {
MongoDatabase db = connect();
MonthPojo pojo = insert(db);
MongoCollection<MonthPojo> coll = db.getCollection(COLL_NAME, MonthPojo.class);
MonthPojo read = coll.find().first();
assertThat(read).isEqualToComparingFieldByField(pojo);
MonthPojo empty = coll.find().projection(Projections.include("id")).first();
assertThat(empty.getScalar()).isNull();
assertThat(empty.getArray()).isNull();
assertThat(empty.getArray2()).isNull();
assertThat(empty.getList()).isNull();
assertThat(empty.getSet()).isNull();
assertThat(empty.getMap()).isNull();
}
DatePojoTest.java 文件源码
项目:bsoncodec-apt
阅读 24
收藏 0
点赞 0
评论 0
@Test
public void testInsertAndFind() {
MongoDatabase db = connect();
DatePojo pojo = insert(db);
MongoCollection<DatePojo> coll = db.getCollection(COLL_NAME, DatePojo.class);
DatePojo read = coll.find().first();
assertThat(read).isEqualToComparingFieldByField(pojo);
DatePojo empty = coll.find().projection(Projections.include("id")).first();
assertThat(empty.getScalar()).isNull();
assertThat(empty.getArray()).isNull();
assertThat(empty.getArray2()).isNull();
assertThat(empty.getList()).isNull();
assertThat(empty.getSet()).isNull();
assertThat(empty.getMap()).isNull();
}
BooleanPojoTest.java 文件源码
项目:bsoncodec-apt
阅读 25
收藏 0
点赞 0
评论 0
@Test
public void testInsertAndFind() {
MongoDatabase db = connect();
BooleanPojo pojo = insert(db);
MongoCollection<BooleanPojo> coll = db.getCollection(COLL_NAME,
BooleanPojo.class);
BooleanPojo read = coll.find().first();
assertThat(read).isEqualToComparingFieldByField(pojo);
BooleanPojo empty = coll.find().projection(Projections.include("id")).first();
assertThat(empty.getScalarPrimitive()).isEqualTo(false);
assertThat(empty.getScalar()).isNull();
assertThat(empty.getArray()).isNull();
assertThat(empty.getArrayPrimitive()).isNull();
assertThat(empty.getArray2()).isNull();
assertThat(empty.getArray2Primitive()).isNull();
assertThat(empty.getList()).isNull();
assertThat(empty.getSet()).isNull();
assertThat(empty.getMap()).isNull();
}
Decimal128PojoTest.java 文件源码
项目:bsoncodec-apt
阅读 27
收藏 0
点赞 0
评论 0
@Test
public void testInsertAndFind() {
MongoDatabase db = connect();
Decimal128Pojo pojo = insert(db);
MongoCollection<Decimal128Pojo> coll = db.getCollection(COLL_NAME,
Decimal128Pojo.class);
Decimal128Pojo read = coll.find().first();
assertThat(read).isEqualToComparingFieldByField(pojo);
Decimal128Pojo empty = coll.find().projection(Projections.include("id")).first();
assertThat(empty.getScalar()).isNull();
assertThat(empty.getArray()).isNull();
assertThat(empty.getArray2()).isNull();
assertThat(empty.getList()).isNull();
assertThat(empty.getSet()).isNull();
assertThat(empty.getMap()).isNull();
}
YearPojoTest.java 文件源码
项目:bsoncodec-apt
阅读 30
收藏 0
点赞 0
评论 0
@Test
public void testInsertAndFind() {
MongoDatabase db = connect();
YearPojo pojo = insert(db);
MongoCollection<YearPojo> coll = db.getCollection(COLL_NAME, YearPojo.class);
YearPojo read = coll.find().first();
assertThat(read).isEqualToComparingFieldByField(pojo);
YearPojo empty = coll.find().projection(Projections.include("id")).first();
assertThat(empty.getScalar()).isNull();
assertThat(empty.getArray()).isNull();
assertThat(empty.getArray2()).isNull();
assertThat(empty.getList()).isNull();
assertThat(empty.getSet()).isNull();
assertThat(empty.getMap()).isNull();
}
DoublePojoTest.java 文件源码
项目:bsoncodec-apt
阅读 35
收藏 0
点赞 0
评论 0
@Test
public void testInsertAndFind() {
MongoDatabase db = connect();
DoublePojo pojo = insert(db);
MongoCollection<DoublePojo> coll = db.getCollection(COLL_NAME, DoublePojo.class);
DoublePojo read = coll.find().first();
assertThat(read).isEqualToComparingFieldByField(pojo);
DoublePojo empty = coll.find().projection(Projections.include("id")).first();
assertThat(empty.getScalarPrimitive()).isEqualTo(0.0f);
assertThat(empty.getScalar()).isNull();
assertThat(empty.getArray()).isNull();
assertThat(empty.getArrayPrimitive()).isNull();
assertThat(empty.getArray2()).isNull();
assertThat(empty.getArray2Primitive()).isNull();
assertThat(empty.getList()).isNull();
assertThat(empty.getSet()).isNull();
assertThat(empty.getMap()).isNull();
}
FloatPojoTest.java 文件源码
项目:bsoncodec-apt
阅读 28
收藏 0
点赞 0
评论 0
@Test
public void testInsertAndFind() {
MongoDatabase db = connect();
FloatPojo pojo = insert(db);
MongoCollection<FloatPojo> coll = db.getCollection(COLL_NAME, FloatPojo.class);
FloatPojo read = coll.find().first();
assertThat(read).isEqualToComparingFieldByField(pojo);
FloatPojo empty = coll.find().projection(Projections.include("id")).first();
assertThat(empty.getScalarPrimitive()).isEqualTo(0.0f);
assertThat(empty.getScalar()).isNull();
assertThat(empty.getArray()).isNull();
assertThat(empty.getArrayPrimitive()).isNull();
assertThat(empty.getArray2()).isNull();
assertThat(empty.getArray2Primitive()).isNull();
assertThat(empty.getList()).isNull();
assertThat(empty.getSet()).isNull();
assertThat(empty.getMap()).isNull();
}
EnumPojoTest.java 文件源码
项目:bsoncodec-apt
阅读 39
收藏 0
点赞 0
评论 0
@Test
public void testInsertAndFind() {
MongoDatabase db = connect();
EnumPojo pojo = insert(db);
MongoCollection<EnumPojo> coll = db.getCollection(COLL_NAME, EnumPojo.class);
EnumPojo read = coll.find().first();
assertThat(read).isEqualToIgnoringGivenFields(pojo, "enumSet3");
assertThat(read.getEnumSet3()).isNull();
EnumPojo empty = coll.find().projection(Projections.include("id")).first();
assertThat(empty.getScalar()).isNull();
assertThat(empty.getArray()).isNull();
assertThat(empty.getArray2()).isNull();
assertThat(empty.getList()).isNull();
assertThat(empty.getSet()).isNull();
assertThat(empty.getEnumSet1()).isNull();
assertThat(empty.getEnumSet2()).isNull();
assertThat(empty.getEnumSet3()).isNull();
assertThat(empty.getMap()).isNull();
assertThat(empty.getEnumMap()).isNull();
}
BytePojoTest.java 文件源码
项目:bsoncodec-apt
阅读 29
收藏 0
点赞 0
评论 0
@Test
public void testInsertAndFind() {
MongoDatabase db = connect();
BytePojo pojo = insert(db);
MongoCollection<BytePojo> coll = db.getCollection(COLL_NAME, BytePojo.class);
BytePojo read = coll.find().first();
assertThat(read).isEqualToComparingFieldByField(pojo);
BytePojo empty = coll.find().projection(Projections.include("id")).first();
assertThat(empty.getScalarPrimitive()).isEqualTo((byte) 0);
assertThat(empty.getScalar()).isNull();
assertThat(empty.getArray()).isNull();
assertThat(empty.getArrayPrimitive()).isNull();
assertThat(empty.getList()).isNull();
assertThat(empty.getSet()).isNull();
assertThat(empty.getMap()).isNull();
}
InstantPojoTest.java 文件源码
项目:bsoncodec-apt
阅读 33
收藏 0
点赞 0
评论 0
@Test
public void testInsertAndFind() {
MongoDatabase db = connect();
InstantPojo pojo = insert(db);
MongoCollection<InstantPojo> coll = db.getCollection(COLL_NAME,
InstantPojo.class);
InstantPojo read = coll.find().first();
assertThat(read).isEqualToComparingFieldByField(pojo);
InstantPojo empty = coll.find().projection(Projections.include("id")).first();
assertThat(empty.getScalar()).isNull();
assertThat(empty.getArray()).isNull();
assertThat(empty.getArray2()).isNull();
assertThat(empty.getList()).isNull();
assertThat(empty.getSet()).isNull();
assertThat(empty.getMap()).isNull();
}
IntegerPojoTest.java 文件源码
项目:bsoncodec-apt
阅读 27
收藏 0
点赞 0
评论 0
@Test
public void testInsertAndFind() {
MongoDatabase db = connect();
IntegerPojo pojo = insert(db);
MongoCollection<IntegerPojo> coll = db.getCollection(COLL_NAME,
IntegerPojo.class);
IntegerPojo read = coll.find().first();
assertThat(read).isEqualToComparingFieldByField(pojo);
IntegerPojo empty = coll.find().projection(Projections.include("id")).first();
assertThat(empty.getScalarPrimitive()).isEqualTo(0);
assertThat(empty.getScalar()).isNull();
assertThat(empty.getArray()).isNull();
assertThat(empty.getArrayPrimitive()).isNull();
assertThat(empty.getArray2()).isNull();
assertThat(empty.getArray2Primitive()).isNull();
assertThat(empty.getList()).isNull();
assertThat(empty.getSet()).isNull();
assertThat(empty.getMap()).isNull();
}
MongoMetadataStoreTest.java 文件源码
项目:mandrel
阅读 24
收藏 0
点赞 0
评论 0
@Test
public void test() {
Job job = new Job().setId(30);
TaskContext taskContext = new TaskContext();
taskContext.setDefinition(job);
MongoMetadataStoreDefinition definition = new MongoMetadataStoreDefinition();
MongoMetadataStore store = definition.build(taskContext);
System.err.println(Sets.newHashSet(store.getCollection().find().projection(Projections.include("_id")).map(doc -> Uri.create(doc.getString("_id")))
.iterator()));
Uri uri = new Uri("http", null, "test", 80, "/pouet", null);
store.delete(uri);
store.addMetadata(uri, new BlobMetadata());
Set<Uri> deduplicate = store.deduplicate(Sets.newHashSet(uri));
Assertions.assertThat(deduplicate).isEmpty();
}
TDFinder.java 文件源码
项目:repositoryminer
阅读 114
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
private void findBugs(String commit) {
FindBugsDAO dao = new FindBugsDAO();
List<Document> analysisDoc = dao.findByCommit(commit, Projections.include("filename", "bugs.category"));
for (Document fileDoc : analysisDoc) {
TDItem tdItem = searchFile(fileDoc.getString("filename"));
List<Document> bugs = (List<Document>) fileDoc.get("bugs");
int specificBugs = 0;
for (Document bug : bugs) {
String category = bug.getString("category");
if (category.equals("MT_CORRECTNESS")) {
addTDIndicator(tdItem, TDIndicator.MULTITHREAD_CORRECTNESS, 1);
specificBugs++;
} else if (category.equals("PERFORMANCE")) {
addTDIndicator(tdItem, TDIndicator.SLOW_ALGORITHM, 1);
specificBugs++;
}
}
if ((bugs.size() - specificBugs) > 0) {
addTDIndicator(tdItem, TDIndicator.AUTOMATIC_STATIC_ANALYSIS_ISSUES, bugs.size() - specificBugs);
}
}
}
MongoMapStore.java 文件源码
项目:testcontainers-hazelcast
阅读 30
收藏 0
点赞 0
评论 0
@Override
public Iterable<String> loadAllKeys() {
log.info("LoadAllKeys");
List<String> keys = new LinkedList<String>();
FindIterable<Document> ids = collection.find().projection(Projections.include("_id"));
for (Document document : ids) {
keys.add(document.get("_id").toString());
}
return keys;
}
SyncEventDao.java 文件源码
项目:mongodb-rdbms-sync
阅读 29
收藏 0
点赞 0
评论 0
public SyncMarker getEventStats(ObjectId eventId) {
Document group = new Document("$group",
new Document(SyncAttrs.ID, null).append(SyncAttrs.TOTAL_ROWS, new Document("$sum", "$marker.totalRows"))
.append(SyncAttrs.ROWS_READ, new Document("$sum", "$marker.rowsRead"))
.append(SyncAttrs.ROWS_DUMPED, new Document("$sum", "$marker.rowsDumped"))
.append(SyncAttrs.START_TIME, new Document("$min", "$marker.startTime"))
.append(SyncAttrs.END_TIME, new Document("$max", "$marker.endTime")));
return syncEvents.aggregate(Arrays.asList(Aggregates.match(Filters.eq(SyncAttrs.PARENT_EVENT_ID, eventId)),
Aggregates.project(Projections.include(SyncAttrs.MARKER)), group), SyncMarker.class).first();
}
SyncEventDao.java 文件源码
项目:mongodb-rdbms-sync
阅读 38
收藏 0
点赞 0
评论 0
public List<SyncError> getEventErrors(ObjectId eventId) {
Document group = new Document("$group",
new Document(SyncAttrs.ID, null).append(SyncAttrs.ERRORS, new Document("$addToSet", "$errors")));
return syncEvents.aggregate(
Arrays.asList(Aggregates.match(Filters.eq(SyncAttrs.PARENT_EVENT_ID, eventId)),
Aggregates.unwind("$errors"),
Aggregates
.project(Projections.include(SyncAttrs.ERRORS)),
group, Aggregates.unwind("$errors"),
Aggregates.project(new Document(SyncAttrs.ERROR_MESSAGE, "$errors.errorMessage")
.append(SyncAttrs.TRACE, "$errors.trace")
.append(SyncAttrs.THREAD_NAME, "$errors.threadName"))),
SyncError.class).allowDiskUse(true).into(new ArrayList<SyncError>());
}
SyncEventDao.java 文件源码
项目:mongodb-rdbms-sync
阅读 33
收藏 0
点赞 0
评论 0
public List<ObjectId> checkCancelledEvents(final Set<ObjectId> activeEventList) {
final List<ObjectId> cancelledEvents = new ArrayList<ObjectId>();
syncEvents
.find(Filters.and(Filters.in(SyncAttrs.ID, activeEventList),
Filters.eq(SyncAttrs.STATUS, SyncStatus.CANCELLED)), Document.class)
.projection(Projections.include(SyncAttrs.ID)).forEach(new Block<Document>() {
@Override
public void apply(Document arg0) {
cancelledEvents.add(arg0.getObjectId(SyncAttrs.ID));
}
});
return cancelledEvents;
}
ApplicationResource.java 文件源码
项目:sam
阅读 40
收藏 0
点赞 0
评论 0
private Iterable<ServerDeployment> findApplicationDeployments(String applicationId) {
return database.getCollection(Collections.SERVERS)
.find(
Filters.eq("deployments.applicationId", applicationId)
).projection(Projections.fields(
Projections.include("hostname", "environment"),
Projections.elemMatch("deployments")
)).map(ServerDeployment::fromBson);
}
SearchResource.java 文件源码
项目:sam
阅读 40
收藏 0
点赞 0
评论 0
private <T> PaginatedCollection<T> textSearch(String query, String collection, Function<Document,T> mapper) {
return RestHelper.paginatedList(
database.getCollection(collection)
.find(Filters.text(query))
.projection(Projections.metaTextScore("score"))
.sort(Sorts.metaTextScore("score"))
.map(mapper)
);
}
GroupResource.java 文件源码
项目:sam
阅读 39
收藏 0
点赞 0
评论 0
private PaginatedCollection<String> getIds() {
return RestHelper.paginatedList(database
.getCollection(Collections.GROUPS)
.find()
.projection(Projections.include("id"))
.map(t->t.getString("id"))
);
}
DatabaseReader.java 文件源码
项目:kafka-connect-mongodb
阅读 36
收藏 0
点赞 0
评论 0
private FindIterable<Document> find(int page){
final FindIterable<Document> documents = oplog
.find(query)
.sort(new Document("$natural", 1))
.skip(page * batchSize)
.limit(batchSize)
.projection(Projections.include("ts", "op", "ns", "o"))
.cursorType(CursorType.TailableAwait);
return documents;
}
MongoNotificationHandler.java 文件源码
项目:Rapture
阅读 23
收藏 0
点赞 0
评论 0
@Override
public NotificationResult findNotificationsAfterEpoch(final CallingContext context, Long lastEpochSeen) {
// Find all records where the recordtype is NOTIFICATION and
// the epoch is gt lastEpochSeen, returning just the ids
// Return the current epoch and these ids.
final Document queryObject = new Document();
queryObject.put(RECORDTYPE, NOTIFICATION);
final Document greaterEpochObject = new Document();
greaterEpochObject.put("$gt", lastEpochSeen);
queryObject.put("epoch", greaterEpochObject);
MongoRetryWrapper<NotificationResult> wrapper = new MongoRetryWrapper<NotificationResult>() {
public FindIterable<Document> makeCursor() {
return getNotificationCollection().find(queryObject).projection(Projections.include("id", "kernelId"));
}
public NotificationResult action(FindIterable<Document> cursor) {
NotificationResult res = new NotificationResult(getLatestNotificationEpoch());
for (Document dobj : cursor) {
String kernelId = (String) dobj.get("kernelId");
// ignore any notifications that came from this same kernel,
// which can cause race conditions
if (kernelId != null && kernelId.equals(context.getContext())) {
continue;
}
res.addId((String) dobj.get("id"));
}
return res;
}
};
return wrapper.doAction();
}