def _periodicLostWorkCheck(self):
"""
Periodically, every node controller has to check to make sure that work
hasn't been dropped on the floor by someone. In order to do that it
queries each work-item table.
"""
@inlineCallbacks
def workCheck(txn):
if self.thisProcess:
nodes = [(node.hostname, node.port) for node in
(yield self.activeNodes(txn))]
nodes.sort()
self._lastSeenTotalNodes = len(nodes)
self._lastSeenNodeIndex = nodes.index(
(self.thisProcess.hostname, self.thisProcess.port)
)
for itemType in self.allWorkItemTypes():
tooLate = datetime.utcfromtimestamp(
self.reactor.seconds() - self.queueProcessTimeout
)
overdueItems = (yield itemType.query(
txn, (itemType.notBefore < tooLate))
)
for overdueItem in overdueItems:
peer = self.choosePerformer()
yield peer.performWork(overdueItem.table,
overdueItem.workID)
if not self.running:
return succeed(None)
return inTransaction(self.transactionFactory, workCheck)
评论列表
文章目录