def run(self):
log.debug("Starting Live Response Job Scheduler")
while True:
log.debug("Waiting for item on Scheduler Queue")
item = self.schedule_queue.get(block=True)
log.debug("Got item: {0}".format(item))
if isinstance(item, WorkItem):
# new WorkItem available
self._unscheduled_jobs[item.sensor_id].append(item)
elif isinstance(item, CompletionNotification):
# job completed
self._idle_workers.add(item.sensor_id)
elif isinstance(item, WorkerStatus):
if item.status == "error":
log.error("Error encountered by JobWorker[{0}]: {1}".format(item.sensor_id,
item.exception))
elif item.status == "exiting":
log.debug("JobWorker[{0}] has exited, waiting...".format(item.sensor_id))
self._job_workers[item.sensor_id].join()
log.debug("JobWorker[{0}] deleted".format(item.sensor_id))
del self._job_workers[item.sensor_id]
try:
self._idle_workers.remove(item.sensor_id)
except KeyError:
pass
elif item.status == "ready":
log.debug("JobWorker[{0}] now ready to accept jobs, session established".format(item.sensor_id))
self._idle_workers.add(item.sensor_id)
else:
log.debug("Unknown status from JobWorker[{0}]: {1}".format(item.sensor_id, item.status))
else:
log.debug("Received unknown item on the scheduler Queue, exiting")
# exiting the scheduler if we get None
# TODO: wait for all worker threads to exit
return
self._schedule_jobs()
评论列表
文章目录