def execute_job(self, full_id, resources_assigned):
self.logger.info("Execute job %s ..." % (full_id, ))
self.executed_jobs += 1
with open(os.devnull, 'r+b', 0) as DEVNULL:
my_env = os.environ.copy()
my_env['AETROS_ATTY'] = '1'
if self.ssh_key_private is not None:
my_env['AETROS_SSH_KEY_BASE64'] = self.ssh_key_private
args = [sys.executable, '-m', 'aetros', 'start']
if resources_assigned['gpus']:
for gpu_id in resources_assigned['gpus']:
args += ['--gpu-device', gpu_id]
args += [full_id]
self.logger.info('$ ' + ' '.join(args))
self.server.send_message({'type': 'job-executed', 'id': full_id})
# Since JobBackend sends SIGINT to its current process group, wit sends also to its parents when same pg.
# We need to change the process group of the process, so this won't happen.
# If we don't this, the process of ServerCommand receives the SIGINT as well.
kwargs = {}
if os.name == 'nt':
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
else:
kwargs['preexec_fn'] = os.setsid
process = subprocess.Popen(args, bufsize=1, env=my_env, stdin=DEVNULL,
stderr=subprocess.PIPE, stdout=subprocess.PIPE, **kwargs)
if self.show_stdout:
self.general_logger_stdout.attach(process.stdout, read_line=True)
self.general_logger_stderr.attach(process.stderr, read_line=True)
self.job_processes[full_id] = process
评论列表
文章目录