def __discover_node(self, msg, task=None):
for _ in range(10):
node_task = yield Task.locate('dispycos_node', location=msg.location,
timeout=MsgTimeout)
if not isinstance(node_task, Task):
yield task.sleep(0.1)
continue
self._disabled_nodes.pop(msg.location.addr, None)
node = self._nodes.pop(msg.location.addr, None)
if node:
logger.warning('Rediscovered dispycosnode at %s; discarding previous incarnation!',
msg.location.addr)
self._disabled_nodes.pop(node.addr, None)
if self._cur_computation:
status_task = self._cur_computation.status_task
else:
status_task = None
if status_task:
for server in node.servers.itervalues():
for rtask, job in server.rtasks.itervalues():
status = pycos.MonitorException(rtask, (Scheduler.TaskAbandoned, None))
status_task.send(status)
status_task.send(DispycosStatus(Scheduler.ServerAbandoned,
server.task.location))
info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform,
node.avail_info)
status_task.send(DispycosStatus(Scheduler.NodeAbandoned, info))
node = self._disabled_nodes.get(msg.location.addr, None)
if not node:
node = Scheduler._Node(msg.name, msg.location.addr)
self._disabled_nodes[msg.location.addr] = node
node.task = node_task
yield self.__get_node_info(node, task=task)
raise StopIteration
评论列表
文章目录