Google Cloud Dataflow Python,检索作业ID

发布于 2021-01-29 15:04:01

我目前正在使用Python处理数据流 模板 ,我想访问作业ID并将其保存到特定的Firestore文档。

是否可以访问作业ID?

我在文档中找不到与此有关的任何内容。

关注者
0
被浏览
70
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    您可以通过dataflow.projects().locations().jobs().list在管道中进行调用来实现(请参见下面的完整代码)。一种可能性是始终使用相同的作业名称来调用模板,这很有意义,否则可以将作业前缀作为运行时参数传递。使用正则表达式解析作业列表,以查看该作业是否包含名称前缀,如果包含名称前缀,则返回该作业ID。如果有多个,它将​​仅返回最新的一个(当前正在运行的一个)。

    在定义PROJECTBUCKET变量之后,使用以下命令暂存该模板:

    python script.py \
        --runner DataflowRunner \
        --project $PROJECT \
        --staging_location gs://$BUCKET/staging \
        --temp_location gs://$BUCKET/temp \
        --template_location gs://$BUCKET/templates/retrieve_job_id
    

    然后,myjobprefix在执行模板化作业时指定所需的作业名称(在我的情况下):

    gcloud dataflow jobs run myjobprefix \
       --gcs-location gs://$BUCKET/templates/retrieve_job_id
    

    retrieve_job_id函数将从作业中返回作业ID,将更job_prefix改为与给定名称匹配。

    import argparse, logging, re
    from googleapiclient.discovery import build
    from oauth2client.client import GoogleCredentials
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    
    
    def retrieve_job_id(element):
      project = 'PROJECT_ID'
      job_prefix = "myjobprefix"
      location = 'us-central1'
    
      logging.info("Looking for jobs with prefix {} in region {}...".format(job_prefix, location))
    
      try:
        credentials = GoogleCredentials.get_application_default()
        dataflow = build('dataflow', 'v1b3', credentials=credentials)
    
        result = dataflow.projects().locations().jobs().list(
          projectId=project,
          location=location,
        ).execute()
    
        job_id = "none"
    
        for job in result['jobs']:
          if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
            job_id = job['id']
            break
    
        logging.info("Job ID: {}".format(job_id))
        return job_id
    
      except Exception as e:
        logging.info("Error retrieving Job ID")
        raise KeyError(e)
    
    
    def run(argv=None):
      parser = argparse.ArgumentParser()
      known_args, pipeline_args = parser.parse_known_args(argv)
    
      pipeline_options = PipelineOptions(pipeline_args)
      pipeline_options.view_as(SetupOptions).save_main_session = True
    
      p = beam.Pipeline(options=pipeline_options)
    
      init_data = (p
                   | 'Start' >> beam.Create(["Init pipeline"])
                   | 'Retrieve Job ID' >> beam.FlatMap(retrieve_job_id))
    
      p.run()
    
    
    if __name__ == '__main__':
      run()
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看