Python Airflow-从PythonOperator返回结果

发布于 2021-01-29 17:49:33

我已经用多个PythonOperator编写了DAG

task1 = af_op.PythonOperator(task_id='Data_Extraction_Environment',
                          provide_context=True,
                          python_callable=Task1, dag=dag1)

def Task1(**kwargs):
    return(kwargs['dag_run'].conf.get('file'))

我从PythonOperator调用“ Task1”方法。该方法正在返回一个值,该值我需要传递给下一个PythonOperator。如何从“
task1”变量中获取值,或者如何从Task1方法中返回该值?

更新 :

    def Task1(**kwargs):
          file_name = kwargs['dag_run'].conf.get[file]
          task_instance = kwargs['task_instance']
          task_instance.xcom_push(key='file', value=file_name) 
          return file_name

  t1 = PythonOperator(task_id = 'Task1',provide_context=True,python_callable=Task1,dag=dag)

  t2 =   BashOperator(
      task_id='Moving_bucket', 
      bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1',key='file') }} ',
      dag=dag,
    )

t2.set_upstream(t1)
关注者
0
被浏览
189
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    您可能想查看Airflow的XCOM:https :
    //airflow.apache.org/concepts.html#xcoms

    如果从函数返回值,则此值存储在xcom中。就您而言,您可以像从其他Python代码一样访问它:

    task_instance = kwargs['task_instance']
    task_instance.xcom_pull(task_ids='Task1')
    

    或像这样的模板中:

    {{ task_instance.xcom_pull(task_ids='Task1') }}
    

    如果要指定键,可以将其推入XCOM(在任务内):

    task_instance = kwargs['task_instance']
    task_instance.xcom_push(key='the_key', value=my_str)
    

    然后,您可以像下面这样访问它:

    task_instance.xcom_pull(task_ids='my_task', key='the_key')
    

    编辑1

    后续问题: 我不能在其他函数中使用该值,而是将其传递给另一个PythonOperator,例如-“ t2 =” BashOperator(task_id
    =’Moving_bucket’,bash_command =’python /home/raw.py“%s” ‘%file_name,dag =
    dag)“-我想访问“ Task1”返回的file_name。如何实现?

    首先,在我看来,该值实际上 不是 传递给另一个,PythonOperator而是传递给BashOperator

    其次,这已经在我上面的回答中涵盖了。该字段bash_command是模板化的(请参阅template_fields源代码:https :
    //github.com/apache/incubator-
    airflow/blob/master/airflow/operators/bash_operator.py)。因此,我们可以使用模板版本:

    BashOperator(
      task_id='Moving_bucket', 
      bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ',
      dag=dag,
    )
    

    编辑2

    说明:Airflow的工作方式如下:它将执行Task1,然后填充xcom,然后执行下一个任务。因此,为使您的示例正常工作,您需要先执行Task1,然后在Task1的下游执行Moving_bucket。

    由于您使用的是返回函数,因此您也可以省略key='file'fromxcom_pull而不是在函数中手动设置它。



知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看