def run(self):
from chroma_core.services.job_scheduler.job_scheduler import JobScheduler
from chroma_core.services.job_scheduler.job_scheduler_client import JobSchedulerRpc
from chroma_core.services.job_scheduler.agent_rpc import AgentRpc
super(Service, self).run()
# Cancel anything that's left behind from a previous run
for command in Command.objects.filter(complete=False):
command.completed(True, True)
Job.objects.filter(~Q(state='complete')).update(state='complete', cancelled=True)
self._job_scheduler = JobScheduler()
self._queue_thread = ServiceThread(QueueHandler(self._job_scheduler))
self._rpc_thread = ServiceThread(JobSchedulerRpc(self._job_scheduler))
self._progress_thread = ServiceThread(self._job_scheduler.progress)
AgentRpc.start()
self._queue_thread.start()
self._rpc_thread.start()
self._progress_thread.start()
self._children_started.set()
self._mail_alerts_thread = MailAlerts(settings.EMAIL_SENDER,
settings.EMAIL_SUBJECT_PREFIX,
settings.EMAIL_HOST)
self._mail_alerts_thread.start()
self._complete.wait()
self.log.info("Cancelling outstanding jobs...")
# Get a fresh view of the job table
with transaction.commit_manually():
transaction.commit()
for job in Job.objects.filter(~Q(state = 'complete')).order_by('-id'):
self._job_scheduler.cancel_job(job.id)
评论列表
文章目录