def _drop_gracefully(self):
"""Gracefully drop controller task.
This method is called when decoding controller task payload failed.
Upon this we mark ShardState and MapreduceState as failed so all
tasks can stop.
Writing to datastore is forced (ignore read-only mode) because we
want the tasks to stop badly, and if force_writes was False,
the job would have never been started.
"""
mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
state = model.MapreduceState.get_by_job_id(mr_id)
if not state or not state.active:
return
state.active = False
state.result_status = model.MapreduceState.RESULT_FAILED
config = util.create_datastore_write_config(state.mapreduce_spec)
puts = []
for ss in model.ShardState.find_all_by_mapreduce_state(state):
if ss.active:
ss.set_for_failure()
puts.append(ss)
if len(puts) > model.ShardState._MAX_STATES_IN_MEMORY:
db.put(puts, config=config)
puts = []
db.put(puts, config=config)
self._finalize_job(state.mapreduce_spec, state)
评论列表
文章目录