def _get_runtime_args(config, workflow_module, workflow_options, current_env):
"""??????????args???????env??args????"""
# ??????args??
args = getattr(workflow_module, "args", {})
if isinstance(args, types.FunctionType):
args = args(workflow_options)
if args is None:
args = {}
# ????????args??
env_args = {} if current_env.args is None else current_env.args
if isinstance(env_args, types.FunctionType):
env_args = env_args(workflow_options)
if env_args is None:
env_args = {}
args.update(env_args)
return args
评论列表
文章目录