def add_to_queue(self, url=None, target_state='queued', countdown=0):
"""
Adds job to task queue and transactionally updates state to 'queued'
and saves job.
Does nothing if state is not 'scheduled'.
"""
if self.state != 'scheduled':
logging.warn('tried to add job {} with state {}, to queue, '
'doing nothing'.format(self.key, self.state))
return
if url is None:
url = self.queue_url
logging.debug(u'scheduling job {} for {}'.format(self.key,
self.user_email))
taskqueue.add(url=url,
payload=json.dumps({'key': self.key.urlsafe()}),
queue_name=self.queue_name,
countdown=countdown,
transactional=True)
self.state = target_state
self.put()
python类transactional()的实例源码
def intask(self, nameprefix, f, *args, **kwargs):
taskkwargs = self.get_taskkwargs()
if nameprefix:
name = "%s-%s" % (nameprefix, self.key.id())
taskkwargs["name"] = name
elif taskkwargs.get("name"):
del taskkwargs["name"]
taskkwargs["transactional"] = False
@task(**taskkwargs)
def dof():
f(*args, **kwargs)
try:
# run the wrapper task, and if it fails due to a name clash just skip it (it was already kicked off by an earlier
# attempt to construct this future).
# logdebug("about to run task %s" % name)
dof()
except taskqueue.TombstonedTaskError:
logdebug("skip adding task %s (already been run)" % name)
except taskqueue.TaskAlreadyExistsError:
logdebug("skip adding task %s (already running)" % name)
def _calculate_progress(self, localprogress):
newcalculatedprogress = localprogress
@ndb.transactional()
def get_children_trans():
return get_children(self.key)
children = get_children_trans()
if children:
for child in children:
newcalculatedprogress += child.get_calculatedprogress()
return newcalculatedprogress
# def update_result(self):
# if self.readyforresult:
# updateresultf = UpdateResultF #pickle.loads(self.updateresultfser) if self.updateresultfser else DefaultUpdateResultF
# updateresultf(self)
#
# # note that updateresultf can change the status
#
# if self.status == "failure":
# self._callOnFailure()
# elif self.status == "success":
# self._callOnSuccess()
def set_success(self, result):
selfkey = self.key
@ndb.transactional
def set_status_transactional():
self = selfkey.get()
didput = False
if self.readyforresult and not self.status:
self.status = "success"
self.initialised = True
self.readyforresult = True
self.resultser = cloudpickle.dumps(result)
self.runtimesec = self.get_runtime().total_seconds()
didput = True
self.put()
return self, didput
self, needcalls = set_status_transactional()
if needcalls:
self._set_local_progress_for_success()
self._callOnSuccess()
def add_note():
page_name = flask.request.args.get('page_name', 'default')
note_title = flask.request.form['note_title']
note_text = flask.request.form['note_text']
parent = parent_key(page_name)
choice = random.randint(0, 1)
if choice == 0:
# Use transactional function
# [START calling]
note_key = ndb.Key(Note, note_title, parent=parent)
note = Note(key=note_key, content=note_text)
# [END calling]
if pick_random_insert(note_key, note) is False:
return ('Already there<br><a href="%s">Return</a>'
% flask.url_for('main_page', page_name=page_name))
return flask.redirect(flask.url_for('main_page', page_name=page_name))
elif choice == 1:
# Use get_or_insert, which is transactional
note = Note.get_or_insert(note_title, parent=parent, content=note_text)
if note.content != note_text:
return ('Already there<br><a href="%s">Return</a>'
% flask.url_for('main_page', page_name=page_name))
return flask.redirect(flask.url_for('main_page', page_name=page_name))
def get_id():
"""Reserve a globally unique ID.
The system will create a random number between MIN_ID and MAX_ID. It then
attempts to create a record in datastore reserving that ID. If the attempt
succeeds, the ID is handed out. If it fails, it tries again.
"""
while True:
try:
candidate = random.randint(MIN_ID, MAX_ID)
# _check_and_create_record will create the record without using a
# transaction. Then _reserve_candidate will flip the reserved flag within
# a transaction. Unfortunately, it doesn't appear that transactional
# gurantees don't extend to two threads creating the same entity at the
# same time.
if _reserve_candidate(candidate):
return candidate
except TransactionFailedError:
pass
def GetChildren(self):
@ndb.transactional()
def get_children_trans():
return get_children(self.key)
return get_children_trans()
def set_failure(self, exception):
selfkey = self.key
@ndb.transactional
def set_status_transactional():
self = selfkey.get()
didput = False
if not self.status:
self.status = "failure"
self.initialised = True
self.readyforresult = True
self.exceptionser = cloudpickle.dumps(exception)
self.runtimesec = self.get_runtime().total_seconds()
didput = True
self.put()
return self, didput
self, needcalls = set_status_transactional()
if needcalls:
self._callOnFailure()
if not self.parentkey:
# top level. Fail everything below
taskkwargs = self.get_taskkwargs()
@task(**taskkwargs)
def failchildren(futurekey):
children = get_children(futurekey)
if children:
for child in children:
child.set_failure(exception)
failchildren(child.key)
failchildren(self.key)
def set_readyforesult(self):
selfkey = self.key
@ndb.transactional
def set_status_transactional():
self = selfkey.get()
didput = False
if not self.readyforresult:
self.initialised = True
self.readyforresult = True
didput = True
self.put()
return self, didput
self, _ = set_status_transactional()
def set_initialised(self):
selfkey = self.key
@ndb.transactional
def set_status_transactional():
self = selfkey.get()
didput = False
if not self.initialised:
self.initialised = True
didput = True
self.put()
return self, didput
self, _ = set_status_transactional()
def post(self):
amount = int(self.request.get('amount'))
# This task should run at most once per second because of the datastore
# transaction write throughput.
@ndb.transactional
def update_counter():
counter = Counter.get_or_insert(COUNTER_KEY, count=0)
counter.count += amount
counter.put()
update_counter()
def insert_if_absent_taskq(note_key, note):
taskqueue.add(url=flask.url_for('taskq_worker'), transactional=True)
# do insert
# [END taskq]
fetch = note_key.get()
if fetch is None:
note.put()
return True
return False
def GenerateOnAllChildSuccess(parentkey, initialvalue, combineresultf, failonerror=True):
def OnAllChildSuccess():
logdebug("Enter GenerateOnAllChildSuccess: %s" % parentkey)
parentfuture = parentkey.get() if parentkey else None
if parentfuture and not parentfuture.has_result():
if not parentfuture.initialised or not parentfuture.readyforresult:
raise Exception("Parent not initialised, retry")
@ndb.transactional()
def get_children_trans():
return get_children(parentfuture.key)
children = get_children_trans()
logdebug("children: %s" % [child.key for child in children])
if children:
result = initialvalue
error = None
finished = True
for childfuture in children:
logdebug("childfuture: %s" % childfuture.key)
if childfuture.has_result():
try:
childresult = childfuture.get_result()
logdebug("childresult(%s): %s" % (childfuture.status, childresult))
result = combineresultf(result, childresult)
logdebug("hasresult:%s" % result)
except Exception, ex:
logdebug("haserror:%s" % repr(ex))
error = ex
break
else:
logdebug("noresult")
finished = False
if error:
logwarning("Internal error, child has error in OnAllChildSuccess: %s" % error)
if failonerror:
parentfuture.set_failure(error)
else:
raise error
elif finished:
logdebug("result: %s" % result)
parentfuture.set_success(result)#(result, initialamount, keyrange))
else:
logdebug("child not finished in OnAllChildSuccess, skipping")
else:
logwarning("Internal error, parent has no children in OnAllChildSuccess")
parentfuture.set_failure(Exception("no children found"))
return OnAllChildSuccess