气流默认on_failure_callback
在我的DAG文件中,我定义了一个on_failure_callback()函数以在失败时发布Slack。
如果我为DAG中的每个运算符都指定,它将很好地工作:on_failure_callback = on_failure_callback()
有没有一种方法可以自动化(例如通过default_args或通过我的DAG对象)向所有操作员的调度?
-
我终于找到了一种方法。
您可以将on_failure_callback作为default_args传递
class Foo: @staticmethod def get_default_args(): """ Return default args :return: default_args """ default_args = { 'on_failure_callback': Foo.on_failure_callback } return default_args @staticmethod def on_failure_callback(context): """ Define the callback to post on Slack if a failure is detected in the Workflow :return: operator.execute """ operator = SlackAPIPostOperator( task_id='failure', text=str(context['task_instance']), token=Variable.get("slack_access_token"), channel=Variable.get("slack_channel") ) return operator.execute(context=context)