mapreduce_pipeline.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码
def run(self,
          job_name,
          mapper_spec,
          reducer_spec,
          input_reader_spec,
          output_writer_spec=None,
          mapper_params=None,
          reducer_params=None,
          shards=None,
          combiner_spec=None):


    if mapper_params.get("bucket_name") is None:
      try:
        mapper_params["bucket_name"] = (
            app_identity.get_default_gcs_bucket_name())
      except Exception, e:
        raise errors.Error("Unable to get the GCS default bucket name. "
                           "Check to see that GCS is properly activated. "
                           + str(e))
    if mapper_params["bucket_name"] is None:
      raise errors.Error("There is no GCS default bucket name. "
                         "Check to see that GCS is properly activated.")


    map_pipeline = yield MapPipeline(job_name,
                                     mapper_spec,
                                     input_reader_spec,
                                     params=mapper_params,
                                     shards=shards)
    shuffler_pipeline = yield ShufflePipeline(
        job_name, mapper_params, map_pipeline)
    reducer_pipeline = yield ReducePipeline(
        job_name,
        reducer_spec,
        output_writer_spec,
        reducer_params,
        mapper_params["bucket_name"],
        shuffler_pipeline,
        combiner_spec=combiner_spec)
    with pipeline.After(reducer_pipeline):
      all_temp_files = yield pipeline_common.Extend(
          map_pipeline, shuffler_pipeline)
      yield CleanupPipeline(all_temp_files)

    yield _ReturnPipeline(map_pipeline.result_status,
                          reducer_pipeline.result_status,
                          reducer_pipeline)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号