def task_wrapper(run_id=None, task=None, step=None, adminq=None, use_process=True, logger_info=None):
'''
Args:
func: object with action method with the following signature:
action(self, action, unit, group, sequencer)
action: object with taskid, unit, group: id of the unit to pass
sqid: sequencer id to pass to action '''
global module_logger
if use_process:
module_logger=MpLogger.get_logger(logger_info=logger_info, name='') # name="%s.%s_%s" %(logger_info['name'], step.name, task.sequence))
task.pid=os.getpid()
os.environ['EVENTOR_STEP_SEQUENCE']=str(task.sequence)
os.environ['EVENTOR_STEP_RECOVERY']=str(task.recovery)
os.environ['EVENTOR_STEP_NAME']=str(step.name)
if setproctitle is not None and use_process:
run_id_s = "%s." % run_id if run_id else ''
setproctitle("eventor: %s%s.%s(%s)" % (run_id_s, step.name, task.id_, task.sequence))
# Update task with PID
update=TaskAdminMsg(msg_type=TaskAdminMsgType.update, value=task)
adminq.put( update )
module_logger.info('[ Step {}/{} ] Trying to run'.format(step.name, task.sequence))
try:
# todo: need to pass task resources.
result=step(seq_path=task.sequence, )
except Exception as e:
trace=inspect.trace()
trace=traces(trace) #[2:]
task.result=(e, pickle.dumps(trace))
task.status=TaskStatus.failure
else:
task.result=result
task.status=TaskStatus.success
result=TaskAdminMsg(msg_type=TaskAdminMsgType.result, value=task)
module_logger.info('[ Step {}/{} ] Completed, status: {}'.format(step.name, task.sequence, str(task.status), ))
adminq.put( result )
return True
评论列表
文章目录