def _finalize_job(cls, mapreduce_spec, mapreduce_state):
"""Finalize job execution.
Invokes done callback and save mapreduce state in a transaction,
and schedule necessary clean ups. This method is idempotent.
Args:
mapreduce_spec: an instance of MapreduceSpec
mapreduce_state: an instance of MapreduceState
"""
config = util.create_datastore_write_config(mapreduce_spec)
queue_name = util.get_queue_name(mapreduce_spec.params.get(
model.MapreduceSpec.PARAM_DONE_CALLBACK_QUEUE))
done_callback = mapreduce_spec.params.get(
model.MapreduceSpec.PARAM_DONE_CALLBACK)
done_callback_target = mapreduce_spec.params.get(
model.MapreduceSpec.PARAM_DONE_CALLBACK_TARGET)
done_task = None
if done_callback:
headers = util._get_task_headers(
mapreduce_spec.mapreduce_id,
util.CALLBACK_MR_ID_TASK_HEADER,
set_host_header=(done_callback_target is None))
done_task = taskqueue.Task(
url=done_callback,
target=done_callback_target,
headers=headers,
method=mapreduce_spec.params.get("done_callback_method", "POST"))
@db.transactional(retries=5)
def _put_state():
"""Helper to store state."""
fresh_state = model.MapreduceState.get_by_job_id(
mapreduce_spec.mapreduce_id)
if not fresh_state.active:
logging.warning(
"Job %s is not active. Looks like spurious task execution. "
"Dropping task.", mapreduce_spec.mapreduce_id)
return
mapreduce_state.put(config=config)
if done_task and not _run_task_hook(
mapreduce_spec.get_hooks(),
"enqueue_done_task",
done_task,
queue_name,
transactional=True):
done_task.add(queue_name, transactional=True)
_put_state()
logging.info("Final result for job '%s' is '%s'",
mapreduce_spec.mapreduce_id, mapreduce_state.result_status)
cls._clean_up_mr(mapreduce_spec)
评论列表
文章目录