handlers.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码
def _finalize_job(cls, mapreduce_spec, mapreduce_state):
    """Finalize job execution.

    Invokes done callback and save mapreduce state in a transaction,
    and schedule necessary clean ups. This method is idempotent.

    Args:
      mapreduce_spec: an instance of MapreduceSpec
      mapreduce_state: an instance of MapreduceState
    """
    config = util.create_datastore_write_config(mapreduce_spec)
    queue_name = util.get_queue_name(mapreduce_spec.params.get(
        model.MapreduceSpec.PARAM_DONE_CALLBACK_QUEUE))
    done_callback = mapreduce_spec.params.get(
        model.MapreduceSpec.PARAM_DONE_CALLBACK)
    done_callback_target = mapreduce_spec.params.get(
        model.MapreduceSpec.PARAM_DONE_CALLBACK_TARGET)

    done_task = None
    if done_callback:


      headers = util._get_task_headers(
          mapreduce_spec.mapreduce_id,
          util.CALLBACK_MR_ID_TASK_HEADER,
          set_host_header=(done_callback_target is None))
      done_task = taskqueue.Task(
          url=done_callback,
          target=done_callback_target,
          headers=headers,
          method=mapreduce_spec.params.get("done_callback_method", "POST"))

    @db.transactional(retries=5)
    def _put_state():
      """Helper to store state."""
      fresh_state = model.MapreduceState.get_by_job_id(
          mapreduce_spec.mapreduce_id)
      if not fresh_state.active:
        logging.warning(
            "Job %s is not active. Looks like spurious task execution. "
            "Dropping task.", mapreduce_spec.mapreduce_id)
        return
      mapreduce_state.put(config=config)

      if done_task and not _run_task_hook(
          mapreduce_spec.get_hooks(),
          "enqueue_done_task",
          done_task,
          queue_name,
          transactional=True):
        done_task.add(queue_name, transactional=True)

    _put_state()
    logging.info("Final result for job '%s' is '%s'",
                 mapreduce_spec.mapreduce_id, mapreduce_state.result_status)
    cls._clean_up_mr(mapreduce_spec)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号