def _drop_gracefully(self):
"""Drop worker task gracefully.
Set current shard_state to failed. Controller logic will take care of
other shards and the entire MR.
"""
shard_id = self.request.headers[util._MR_SHARD_ID_TASK_HEADER]
mr_id = self.request.headers[util._MR_ID_TASK_HEADER]
shard_state, mr_state = db.get([model.ShardState.get_key_by_shard_id(shard_id), model.MapreduceState.get_key_by_job_id(mr_id)])
if shard_state and shard_state.active:
logging.error('Would normally mark this shard for failure...and kill the entire mapreduce!')
logging.error('But we ignore that and let this shard continue to run (and fail) instead.')
# shard_state.set_for_failure()
# config = util.create_datastore_write_config(mr_state.mapreduce_spec)
# shard_state.put(config=config)
raise Exception('Worker cannot run due to attempt to drop gracefully.')
评论列表
文章目录