def _save_states(self, state, serialized_readers_entity):
"""Run transaction to save state.
Args:
state: a model.MapreduceState entity.
serialized_readers_entity: a model._HugeTaskPayload entity containing
json serialized input readers.
Returns:
False if a fatal error is encountered and this task should be dropped
immediately. True if transaction is successful. None if a previous
attempt of this same transaction has already succeeded.
"""
mr_id = state.key().id_or_name()
fresh_state = model.MapreduceState.get_by_job_id(mr_id)
if not self._check_mr_state(fresh_state, mr_id):
return False
if fresh_state.active_shards != 0:
logging.warning(
"Mapreduce %s already has active shards. Looks like spurious task "
"execution.", mr_id)
return None
config = util.create_datastore_write_config(state.mapreduce_spec)
db.put([state, serialized_readers_entity], config=config)
return True
评论列表
文章目录