private Iterator<EventInIdentifiedStream> getIteratorWhoseHeadIsNext() {
if (iteratorWhoseHeadIsNext != null) {
return iteratorWhoseHeadIsNext;
}
Iterator<PeekingIterator<EventInIdentifiedStream>> streams = underlying.iterator();
while (streams.hasNext()) {
PeekingIterator<EventInIdentifiedStream> eventStream = streams.next();
Instant potentialCutoffTime = clock.instant();
if (eventStream.hasNext()) {
if (cutOffTime != null && eventStream.peek().event.effectiveTimestamp().isAfter(cutOffTime)) {
streams.remove();
} else if (iteratorWhoseHeadIsNext == null || order.compare(eventStream.peek(), iteratorWhoseHeadIsNext.peek()) < 0) {
iteratorWhoseHeadIsNext = eventStream;
}
} else {
streams.remove();
if (this.cutOffTime == null) {
this.cutOffTime = potentialCutoffTime.minus(delay).toEpochMilli();
}
}
}
if (iteratorWhoseHeadIsNext != null) {
long cutoff = this.cutOffTime == null ? clock.instant().minus(delay).toEpochMilli() : this.cutOffTime;
if (iteratorWhoseHeadIsNext.peek().event.effectiveTimestamp().isAfter(cutoff)) {
underlying.clear();
return null;
}
}
return iteratorWhoseHeadIsNext;
}
EventStoreMergingIterator.java 文件源码
java
阅读 20
收藏 0
点赞 0
评论 0
项目:tg-eventstore
作者:
评论列表
文章目录