def do_request_async(self, url, method='GET', headers=None, payload=None,
deadline=None, callback=None):
"""Inherit docs.
This method translates urlfetch exceptions to more service specific ones.
"""
if headers is None:
headers = {}
if 'x-goog-api-version' not in headers:
headers['x-goog-api-version'] = '2'
headers['accept-encoding'] = 'gzip, *'
try:
resp_tuple = yield super(_StorageApi, self).do_request_async(
url, method=method, headers=headers, payload=payload,
deadline=deadline, callback=callback)
except urlfetch.DownloadError, e:
raise errors.TimeoutError(
'Request to Google Cloud Storage timed out.', e)
raise ndb.Return(resp_tuple)
python类Return()的实例源码
def do_request_async(self, url, method='GET', headers=None, payload=None,
deadline=None, callback=None):
"""Inherit docs.
This method translates urlfetch exceptions to more service specific ones.
"""
if headers is None:
headers = {}
if 'x-goog-api-version' not in headers:
headers['x-goog-api-version'] = '2'
headers['accept-encoding'] = 'gzip, *'
try:
resp_tuple = yield super(_StorageApi, self).do_request_async(
url, method=method, headers=headers, payload=payload,
deadline=deadline, callback=callback)
except urlfetch.DownloadError as e:
raise errors.TimeoutError(
'Request to Google Cloud Storage timed out.', e)
raise ndb.Return(resp_tuple)
def get_credentials_async(user_id):
""" Get oauth credentials by user_id asynchronously """
model = yield Account.get_by_id_async(user_id)
if model is None:
credentials = None
else:
credentials = model.credentials
raise ndb.Return(credentials)
def _get_segment(self, start, request_size, check_response=True):
"""Get a segment of the file from Google Storage.
Args:
start: start offset of the segment. Inclusive. Have to be within the
range of the file.
request_size: number of bytes to request. Have to be small enough
for a single urlfetch request. May go over the logical range of the
file.
check_response: True to check the validity of GCS response automatically
before the future returns. False otherwise. See Yields section.
Yields:
If check_response is True, the segment [start, start + request_size)
of the file.
Otherwise, a tuple. The first element is the unverified file segment.
The second element is a closure that checks response. Caller should
first invoke the closure before consuing the file segment.
Raises:
ValueError: if the file has changed while reading.
"""
end = start + request_size - 1
content_range = '%d-%d' % (start, end)
headers = {'Range': 'bytes=' + content_range}
status, resp_headers, content = yield self._api.get_object_async(
self._path, headers=headers)
def _checker():
errors.check_status(status, [200, 206], self._path, headers,
resp_headers, body=content)
self._check_etag(resp_headers.get('etag'))
if check_response:
_checker()
raise ndb.Return(content)
raise ndb.Return(content, _checker)
def tell(self):
"""Return the total number of bytes passed to write() so far.
(There is no seek() method.)
"""
return self._offset
def _make_token_async(scopes, service_account_id):
"""Get a fresh authentication token.
Args:
scopes: A list of scopes.
service_account_id: Internal-use only.
Raises:
An ndb.Return with a tuple (token, expiration_time) where expiration_time is
seconds since the epoch.
"""
rpc = app_identity.create_rpc()
app_identity.make_get_access_token_call(rpc, scopes, service_account_id)
token, expires_at = yield rpc
raise ndb.Return((token, expires_at))
def do_request_async(self, url, method='GET', headers=None, payload=None,
deadline=None, callback=None):
"""Issue one HTTP request.
It performs async retries using tasklets.
Args:
url: the url to fetch.
method: the method in which to fetch.
headers: the http headers.
payload: the data to submit in the fetch.
deadline: the deadline in which to make the call.
callback: the call to make once completed.
Yields:
The async fetch of the url.
"""
retry_wrapper = api_utils._RetryWrapper(
self.retry_params,
retriable_exceptions=api_utils._RETRIABLE_EXCEPTIONS,
should_retry=api_utils._should_retry)
resp = yield retry_wrapper.run(
self.urlfetch_async,
url=url,
method=method,
headers=headers,
payload=payload,
deadline=deadline,
callback=callback,
follow_redirects=False)
raise ndb.Return((resp.status_code, resp.headers, resp.content))
def get_cart_tasklet(acct):
cart = yield CartItem.query(CartItem.account == acct.key).fetch_async()
yield ndb.get_multi_async([item.inventory for item in cart])
raise ndb.Return(cart)
def get_offers_tasklet(acct):
offers = yield SpecialOffer.query().fetch_async(10)
yield ndb.get_multi_async([offer.inventory for offer in offers])
raise ndb.Return(offers)
def get_cart_plus_offers_tasklet(acct):
cart, offers = yield get_cart_tasklet(acct), get_offers_tasklet(acct)
raise ndb.Return((cart, offers))
# [END cart_offers_tasklets]
def iterate_over_query_results_in_tasklet(Model, is_the_entity_i_want):
qry = Model.query()
qit = qry.iter()
while (yield qit.has_next_async()):
entity = qit.next()
# Do something with entity
if is_the_entity_i_want(entity):
raise ndb.Return(entity)
def blocking_iteration_over_query_results(Model, is_the_entity_i_want):
# DO NOT DO THIS IN A TASKLET
qry = Model.query()
for entity in qry:
# Do something with entity
if is_the_entity_i_want(entity):
raise ndb.Return(entity)
def get_messages_async(self):
@ndb.tasklet
def callback(msg):
acct = yield msg.author.get_async()
raise ndb.Return('On {}, {} wrote:\n{}'.format(
msg.when, acct.nick(), msg.text))
qry = Message.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
self.response.out.write('<p>{}</p>'.format(output))
def _get_segment(self, start, request_size, check_response=True):
"""Get a segment of the file from Google Storage.
Args:
start: start offset of the segment. Inclusive. Have to be within the
range of the file.
request_size: number of bytes to request. Have to be small enough
for a single urlfetch request. May go over the logical range of the
file.
check_response: True to check the validity of GCS response automatically
before the future returns. False otherwise. See Yields section.
Yields:
If check_response is True, the segment [start, start + request_size)
of the file.
Otherwise, a tuple. The first element is the unverified file segment.
The second element is a closure that checks response. Caller should
first invoke the closure before consuing the file segment.
Raises:
ValueError: if the file has changed while reading.
"""
end = start + request_size - 1
content_range = '%d-%d' % (start, end)
headers = {'Range': 'bytes=' + content_range}
status, resp_headers, content = yield self._api.get_object_async(
self._path, headers=headers)
def _checker():
errors.check_status(status, [200, 206], self._path, headers,
resp_headers, body=content)
self._check_etag(resp_headers.get('etag'))
if check_response:
_checker()
raise ndb.Return(content)
raise ndb.Return(content, _checker)
def tell(self):
"""Return the total number of bytes passed to write() so far.
(There is no seek() method.)
"""
return self._offset
def _make_token_async(scopes, service_account_id):
"""Get a fresh authentication token.
Args:
scopes: A list of scopes.
service_account_id: Internal-use only.
Raises:
An ndb.Return with a tuple (token, expiration_time) where expiration_time is
seconds since the epoch.
"""
rpc = app_identity.create_rpc()
app_identity.make_get_access_token_call(rpc, scopes, service_account_id)
token, expires_at = yield rpc
raise ndb.Return((token, expires_at))
def do_request_async(self, url, method='GET', headers=None, payload=None,
deadline=None, callback=None):
"""Issue one HTTP request.
It performs async retries using tasklets.
Args:
url: the url to fetch.
method: the method in which to fetch.
headers: the http headers.
payload: the data to submit in the fetch.
deadline: the deadline in which to make the call.
callback: the call to make once completed.
Yields:
The async fetch of the url.
"""
retry_wrapper = api_utils._RetryWrapper(
self.retry_params,
retriable_exceptions=api_utils._RETRIABLE_EXCEPTIONS,
should_retry=api_utils._should_retry)
resp = yield retry_wrapper.run(
self.urlfetch_async,
url=url,
method=method,
headers=headers,
payload=payload,
deadline=deadline,
callback=callback,
follow_redirects=False)
raise ndb.Return((resp.status_code, resp.headers, resp.content))
def urlfetch_async(self, url, method='GET', headers=None,
payload=None, deadline=None, callback=None,
follow_redirects=False):
"""Make an async urlfetch() call.
This is an async wrapper around urlfetch(). It adds an authentication
header.
Args:
url: the url to fetch.
method: the method in which to fetch.
headers: the http headers.
payload: the data to submit in the fetch.
deadline: the deadline in which to make the call.
callback: the call to make once completed.
follow_redirects: whether or not to follow redirects.
Yields:
This returns a Future despite not being decorated with @ndb.tasklet!
"""
headers = {} if headers is None else dict(headers)
headers.update(self.user_agent)
try:
self.token = yield self.get_token_async()
except app_identity.InternalError, e:
if os.environ.get('DATACENTER', '').endswith('sandman'):
self.token = None
logging.warning('Could not fetch an authentication token in sandman '
'based Appengine devel setup; proceeding without one.')
else:
raise e
if self.token:
headers['authorization'] = 'OAuth ' + self.token
deadline = deadline or self.retry_params.urlfetch_timeout
ctx = ndb.get_context()
resp = yield ctx.urlfetch(
url, payload=payload, method=method,
headers=headers, follow_redirects=follow_redirects,
deadline=deadline, callback=callback)
raise ndb.Return(resp)
def urlfetch_async(self, url, method='GET', headers=None,
payload=None, deadline=None, callback=None,
follow_redirects=False):
"""Make an async urlfetch() call.
This is an async wrapper around urlfetch(). It adds an authentication
header.
Args:
url: the url to fetch.
method: the method in which to fetch.
headers: the http headers.
payload: the data to submit in the fetch.
deadline: the deadline in which to make the call.
callback: the call to make once completed.
follow_redirects: whether or not to follow redirects.
Yields:
This returns a Future despite not being decorated with @ndb.tasklet!
"""
headers = {} if headers is None else dict(headers)
headers.update(self.user_agent)
try:
self.token = yield self.get_token_async()
except app_identity.InternalError, e:
if os.environ.get('DATACENTER', '').endswith('sandman'):
self.token = None
logging.warning('Could not fetch an authentication token in sandman '
'based Appengine devel setup; proceeding without one.')
else:
raise e
if self.token:
headers['authorization'] = 'OAuth ' + self.token
deadline = deadline or self.retry_params.urlfetch_timeout
ctx = ndb.get_context()
resp = yield ctx.urlfetch(
url, payload=payload, method=method,
headers=headers, follow_redirects=follow_redirects,
deadline=deadline, callback=callback)
raise ndb.Return(resp)
def get_score_by_screen_name(screen_name, depth):
# Gets the most recently updated copy, if duplicated.
key_name = (
'.' + screen_name if screen_name.startswith('__') else screen_name)
try:
score = yield ndb.Key(Score, key_name).get_async()
except OverQuotaError:
logging.critical('We are over quota.')
raise ndb.Return(None)
if score is None or (
score.last_updated < datetime.datetime.now()
- datetime.timedelta(days=MAX_AGE_DAYS)):
# If we don't have one, or if we have one that's too old, we need
# to calculate one.
if score is not None:
logging.info('Refreshing {}'.format(screen_name))
else:
logging.info('Fetching {} for the first time'.format(screen_name))
task_name = '{}_{}'.format(
screen_name,
os.environ['CURRENT_VERSION_ID'].split('.')[0])
queue_name = 'scoring-direct' if depth == 0 else 'scoring-indirect'
try:
_ = yield taskqueue.Task(
name=task_name,
params={
'screen_name': screen_name,
'depth': depth
}).add_async(queue_name)
# If this is a direct query, schedule an analysis of the profile
# picture.
if depth == 0:
_ = yield taskqueue.Task(
name=task_name,
params={
'screen_name': screen_name,
}).add_async('profile-pic')
# If we add it to the scoring-direct queue, we should remove
# the corresponding task from the scoring-indirect queue at this
# point.
if queue_name == 'scoring-direct':
delete_from_scoring_indirect(task_name)
except taskqueue.TaskAlreadyExistsError:
# We already are going to check this person. There is nothing
# to do here.
logging.warning(
'Fetch for {} already scheduled on queue {}'.format(
task_name, queue_name))
except taskqueue.TombstonedTaskError:
# This task is too recent. We shouldn't try again so
# soon. Thombstoning won't happen across different deploys, as the
# task name has the deploy timestamp on it.
logging.warning('Fetch for {} tombstoned'.format(task_name))
else:
logging.info('No need to refresh {}'.format(screen_name))
raise ndb.Return(score)
def get_score_by_twitter_id(twitter_id, depth):
try:
score = yield Score.query(Score.twitter_id == twitter_id).get_async()
except OverQuotaError:
logging.critical(
'Over quota fetching {}'.format(twitter_id))
raise ndb.Return(None)
if score is None or (
score.last_updated < datetime.datetime.now()
- datetime.timedelta(days=MAX_AGE_DAYS)):
# If we don't have one, or if we have one that's too old, we need
# to calculate one.
task_name = '{}_{}'.format(
twitter_id,
os.environ['CURRENT_VERSION_ID'].split('.')[0])
queue_name = 'scoring-direct' if depth == 0 else 'scoring-indirect'
try:
_ = yield taskqueue.Task(
name=task_name,
params={
'twitter_id': twitter_id,
'depth': depth
}).add_async(queue_name)
# If this is a direct query, schedule an analysis of the profile
# picture.
if depth == 0:
_ = yield taskqueue.Task(
name=task_name,
params={
'twitter_id': twitter_id,
}).add_async('profile-pic')
# If we add it to the scoring-direct queue, we should remove
# the corresponding task from the scoring-indirect queue at this
# point.
if queue_name == 'scoring-direct':
delete_from_scoring_indirect(task_name)
except taskqueue.TaskAlreadyExistsError:
# We already are going to check this person. There is nothing
# to do here.
logging.warning(
'Fetch for {} already scheduled on queue {}'.format(
task_name, queue_name))
except taskqueue.TombstonedTaskError:
# This task is too recent. We shouldn't try again so
# soon. Thombstoning won't happen across different deploys, as the
# task name has the deploy timestamp on it.
logging.warning('Fetch for {} tombstoned'.format(task_name))
raise ndb.Return(score)
else:
raise ndb.Return(score)