气流默认on_failure_callback

发布于 2021-01-29 18:09:14

在我的DAG文件中,我定义了一个on_failure_callback()函数以在失败时发布Slack。

如果我为DAG中的每个运算符都指定,它将很好地工作:on_failure_callback = on_failure_callback()

有没有一种方法可以自动化(例如通过default_args或通过我的DAG对象)向所有操作员的调度?

关注者
0
被浏览
45
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    我终于找到了一种方法。

    您可以将on_failure_callback作为default_args传递

    class Foo:
      @staticmethod
      def get_default_args():
          """
          Return default args
          :return: default_args
          """
    
          default_args = {
              'on_failure_callback': Foo.on_failure_callback
          }
    
          return default_args
    
      @staticmethod
      def on_failure_callback(context):
         """
         Define the callback to post on Slack if a failure is detected in the Workflow
         :return: operator.execute
         """
    
         operator = SlackAPIPostOperator(
             task_id='failure',
             text=str(context['task_instance']),
             token=Variable.get("slack_access_token"),
             channel=Variable.get("slack_channel")
         )
    
         return operator.execute(context=context)
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看