def update_worker(self):
"""
Look for executors where the connection has broken and tasks need to be re-submitted.
"""
if not self.worker:
self.worker = Worker.objects.create(
name=self.name,
start_timestamp=functions.Now(),
last_heartbeat=functions.Now()
)
else:
self.worker.refresh_from_db()
if self.worker.status == Worker.LOST:
# someone else marked us as lost, terminate all tasks and exit
logger.warning('Marked as lost, committing harakiri')
self.state = State.terminate
self.executor.mark_terminated(self.executor.get_running_ids())
return
# update our timestamp so no one marks us as lost
self.worker.last_heartbeat = functions.Now()
self.worker.save()
# look for lost workers and re-queue their tasks
Worker.find_lost(self.timeout)
评论列表
文章目录