def _lostWorkCheckLoop(self):
"""
While the service is running, keep checking for any overdue / lost work
items and re-submit them to the cluster for processing. Space out
those checks in time based on the size of the cluster.
"""
self._lostWorkCheckCall = None
if not self.running:
return
@passthru(
self._periodicLostWorkCheck().addErrback(log.err).addCallback
)
def scheduleNext(result):
self._currentWorkDeferred = None
if not self.running:
return
index = self.nodeIndex()
now = self.reactor.seconds()
interval = self.queueDelayedProcessInterval
count = self.totalNumberOfNodes()
when = (now - (now % interval)) + (interval * (count + index))
delay = when - now
self._lostWorkCheckCall = self.reactor.callLater(
delay, self._lostWorkCheckLoop
)
self._currentWorkDeferred = scheduleNext
评论列表
文章目录