def _run_until_rpc():
"""Eagerly evaluate tasklets until it is blocking on some RPC.
Usually ndb eventloop el isn't run until some code calls future.get_result().
When an async tasklet is called, the tasklet wrapper evaluates the tasklet
code into a generator, enqueues a callback _help_tasklet_along onto
the el.current queue, and returns a future.
_help_tasklet_along, when called by the el, will
get one yielded value from the generator. If the value if another future,
set up a callback _on_future_complete to invoke _help_tasklet_along
when the dependent future fulfills. If the value if a RPC, set up a
callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills.
Thus _help_tasklet_along drills down
the chain of futures until some future is blocked by RPC. El runs
all callbacks and constantly check pending RPC status.
"""
el = eventloop.get_event_loop()
while el.current:
el.run0()
python类tasklet()的实例源码
def _check_etag(self, etag):
"""Check if etag is the same across requests to GCS.
If self._etag is None, set it. If etag is set, check that the new
etag equals the old one.
In the __init__ method, we fire one HEAD and one GET request using
ndb tasklet. One of them would return first and set the first value.
Args:
etag: etag from a GCS HTTP response. None if etag is not part of the
response header. It could be None for example in the case of GCS
composite file.
Raises:
ValueError: if two etags are not equal.
"""
if etag is None:
return
elif self._etag is None:
self._etag = etag
elif self._etag != etag:
raise ValueError('File on GCS has changed while reading.')
def _run_until_rpc():
"""Eagerly evaluate tasklets until it is blocking on some RPC.
Usually ndb eventloop el isn't run until some code calls future.get_result().
When an async tasklet is called, the tasklet wrapper evaluates the tasklet
code into a generator, enqueues a callback _help_tasklet_along onto
the el.current queue, and returns a future.
_help_tasklet_along, when called by the el, will
get one yielded value from the generator. If the value if another future,
set up a callback _on_future_complete to invoke _help_tasklet_along
when the dependent future fulfills. If the value if a RPC, set up a
callback _on_rpc_complete to invoke _help_tasklet_along when the RPC fulfills.
Thus _help_tasklet_along drills down
the chain of futures until some future is blocked by RPC. El runs
all callbacks and constantly check pending RPC status.
"""
el = eventloop.get_event_loop()
while el.current:
el.run0()
def _check_etag(self, etag):
"""Check if etag is the same across requests to GCS.
If self._etag is None, set it. If etag is set, check that the new
etag equals the old one.
In the __init__ method, we fire one HEAD and one GET request using
ndb tasklet. One of them would return first and set the first value.
Args:
etag: etag from a GCS HTTP response. None if etag is not part of the
response header. It could be None for example in the case of GCS
composite file.
Raises:
ValueError: if two etags are not equal.
"""
if etag is None:
return
elif self._etag is None:
self._etag = etag
elif self._etag != etag:
raise ValueError('File on GCS has changed while reading.')
def __init__(self,
retry_params,
retriable_exceptions=_RETRIABLE_EXCEPTIONS,
should_retry=lambda r: False):
"""Init.
Args:
retry_params: an RetryParams instance.
retriable_exceptions: a list of exception classes that are retriable.
should_retry: a function that takes a result from the tasklet and returns
a boolean. True if the result should be retried.
"""
self.retry_params = retry_params
self.retriable_exceptions = retriable_exceptions
self.should_retry = should_retry
def _eager_tasklet(tasklet):
"""Decorator to turn tasklet to run eagerly."""
@utils.wrapping(tasklet)
def eager_wrapper(*args, **kwds):
fut = tasklet(*args, **kwds)
_run_until_rpc()
return fut
return eager_wrapper
def define_get_google():
@ndb.tasklet
def get_google():
context = ndb.get_context()
result = yield context.urlfetch("http://www.google.com/")
if result.status_code == 200:
raise ndb.Return(result.content)
# else return None
return get_google
def get_messages_async(self):
@ndb.tasklet
def callback(msg):
acct = yield msg.author.get_async()
raise ndb.Return('On {}, {} wrote:\n{}'.format(
msg.when, acct.nick(), msg.text))
qry = Message.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
self.response.out.write('<p>{}</p>'.format(output))
def __init__(self,
retry_params,
retriable_exceptions=_RETRIABLE_EXCEPTIONS,
should_retry=lambda r: False):
"""Init.
Args:
retry_params: an RetryParams instance.
retriable_exceptions: a list of exception classes that are retriable.
should_retry: a function that takes a result from the tasklet and returns
a boolean. True if the result should be retried.
"""
self.retry_params = retry_params
self.retriable_exceptions = retriable_exceptions
self.should_retry = should_retry
def _eager_tasklet(tasklet):
"""Decorator to turn tasklet to run eagerly."""
@utils.wrapping(tasklet)
def eager_wrapper(*args, **kwds):
fut = tasklet(*args, **kwds)
_run_until_rpc()
return fut
return eager_wrapper
def urlfetch_async(self, url, method='GET', headers=None,
payload=None, deadline=None, callback=None,
follow_redirects=False):
"""Make an async urlfetch() call.
This is an async wrapper around urlfetch(). It adds an authentication
header.
Args:
url: the url to fetch.
method: the method in which to fetch.
headers: the http headers.
payload: the data to submit in the fetch.
deadline: the deadline in which to make the call.
callback: the call to make once completed.
follow_redirects: whether or not to follow redirects.
Yields:
This returns a Future despite not being decorated with @ndb.tasklet!
"""
headers = {} if headers is None else dict(headers)
headers.update(self.user_agent)
try:
self.token = yield self.get_token_async()
except app_identity.InternalError, e:
if os.environ.get('DATACENTER', '').endswith('sandman'):
self.token = None
logging.warning('Could not fetch an authentication token in sandman '
'based Appengine devel setup; proceeding without one.')
else:
raise e
if self.token:
headers['authorization'] = 'OAuth ' + self.token
deadline = deadline or self.retry_params.urlfetch_timeout
ctx = ndb.get_context()
resp = yield ctx.urlfetch(
url, payload=payload, method=method,
headers=headers, follow_redirects=follow_redirects,
deadline=deadline, callback=callback)
raise ndb.Return(resp)
def urlfetch_async(self, url, method='GET', headers=None,
payload=None, deadline=None, callback=None,
follow_redirects=False):
"""Make an async urlfetch() call.
This is an async wrapper around urlfetch(). It adds an authentication
header.
Args:
url: the url to fetch.
method: the method in which to fetch.
headers: the http headers.
payload: the data to submit in the fetch.
deadline: the deadline in which to make the call.
callback: the call to make once completed.
follow_redirects: whether or not to follow redirects.
Yields:
This returns a Future despite not being decorated with @ndb.tasklet!
"""
headers = {} if headers is None else dict(headers)
headers.update(self.user_agent)
try:
self.token = yield self.get_token_async()
except app_identity.InternalError, e:
if os.environ.get('DATACENTER', '').endswith('sandman'):
self.token = None
logging.warning('Could not fetch an authentication token in sandman '
'based Appengine devel setup; proceeding without one.')
else:
raise e
if self.token:
headers['authorization'] = 'OAuth ' + self.token
deadline = deadline or self.retry_params.urlfetch_timeout
ctx = ndb.get_context()
resp = yield ctx.urlfetch(
url, payload=payload, method=method,
headers=headers, follow_redirects=follow_redirects,
deadline=deadline, callback=callback)
raise ndb.Return(resp)