def funcWrapper(*args) -> bool:
"""can be passed to executor, workflow step is loaded from args[0], funcWrapper wait for futures and stores values
in steps before a launcher is started"""
assert (len(args) == 1)
aSWorkflowStepState = args[0]
assert (isinstance(aSWorkflowStepState, states.WorkflowStepState))
# wait for prior future results
for key, input in aSWorkflowStepState.inputPortStates.items():
currentValue = input.getEvaluation() # future
if isinstance(currentValue, Future):
# Quick and dirty: blocks until port-states are written by launchTool;
# Later: this wont work in multi-CPU / multi-node enviroments because port states might be not in sync!
input.setValue(currentValue.result())
logging.getLogger('system').debug(currentValue.result())
else:
input.setValue(currentValue)
launcher.launchTool(aSWorkflowStepState)
return True
评论列表
文章目录