def _exec_task_process(self, ctxt, task_id, task_type, origin, destination,
instance, task_info):
mp_ctx = multiprocessing.get_context('spawn')
mp_q = mp_ctx.Queue()
mp_log_q = mp_ctx.Queue()
p = mp_ctx.Process(
target=_task_process,
args=(ctxt, task_id, task_type, origin, destination, instance,
task_info, mp_q, mp_log_q))
p.start()
LOG.info("Task process started: %s", task_id)
self._rpc_conductor_client.set_task_host(
ctxt, task_id, self._server, p.pid)
self._handle_mp_log_events(p, mp_log_q)
p.join()
if mp_q.empty():
raise exception.CoriolisException("Task canceled")
result = mp_q.get(False)
if isinstance(result, str):
raise exception.TaskProcessException(result)
return result
评论列表
文章目录