def run(self, job, computation, node):
def _run(self, task=None):
self.task.send({'req': 'run', 'auth': computation._auth, 'job': job, 'client': task})
rtask = yield task.receive(timeout=MsgTimeout)
# currently fault-tolerancy is not supported, so clear job's
# args to save space
job.args = job.kwargs = None
if isinstance(rtask, Task):
# TODO: keep func too for fault-tolerance
job.done = pycos.Event()
self.rtasks[rtask] = (rtask, job)
if self.askew_results:
msg = self.askew_results.pop(rtask, None)
if msg:
Scheduler.__status_task.send(msg)
else:
logger.debug('failed to create rtask: %s', rtask)
if job.cpu:
self.avail.set()
node.cpus_used -= 1
node.load = float(node.cpus_used) / len(node.servers)
self.scheduler._avail_nodes.add(node)
self.scheduler._nodes_avail.set()
node.avail.set()
raise StopIteration(rtask)
rtask = yield SysTask(_run, self).finish()
job.client.send(rtask)
评论列表
文章目录