def run(self,
job_name,
mapper_spec,
reducer_spec,
input_reader_spec,
output_writer_spec=None,
mapper_params=None,
reducer_params=None,
shards=None,
combiner_spec=None):
if mapper_params.get("bucket_name") is None:
try:
mapper_params["bucket_name"] = (
app_identity.get_default_gcs_bucket_name())
except Exception, e:
raise errors.Error("Unable to get the GCS default bucket name. "
"Check to see that GCS is properly activated. "
+ str(e))
if mapper_params["bucket_name"] is None:
raise errors.Error("There is no GCS default bucket name. "
"Check to see that GCS is properly activated.")
map_pipeline = yield MapPipeline(job_name,
mapper_spec,
input_reader_spec,
params=mapper_params,
shards=shards)
shuffler_pipeline = yield ShufflePipeline(
job_name, mapper_params, map_pipeline)
reducer_pipeline = yield ReducePipeline(
job_name,
reducer_spec,
output_writer_spec,
reducer_params,
mapper_params["bucket_name"],
shuffler_pipeline,
combiner_spec=combiner_spec)
with pipeline.After(reducer_pipeline):
all_temp_files = yield pipeline_common.Extend(
map_pipeline, shuffler_pipeline)
yield CleanupPipeline(all_temp_files)
yield _ReturnPipeline(map_pipeline.result_status,
reducer_pipeline.result_status,
reducer_pipeline)
mapreduce_pipeline.py 文件源码
python
阅读 22
收藏 0
点赞 0
评论 0
评论列表
文章目录