def validate_google_id_token(self, token):
from settings import secrets
success = False
email = name = None
g_response = urlfetch.fetch("https://www.googleapis.com/oauth2/v3/tokeninfo?id_token=%s" % token)
if g_response.status_code == 200:
json_response = json.loads(g_response.content)
if 'aud' in json_response:
aud = json_response['aud']
if aud == secrets.GOOGLE_CLIENT_ID:
success = True
email = json_response.get("email", None)
name = json_response.get("name", None)
else:
logging.error("Client ID mismatch")
return (success, email, name)
python类fetch()的实例源码
def get_request_token(base):
'''
Get request token
'''
data = urllib.urlencode({
'consumer_key': POCKET_CONSUMER_KEY,
'redirect_uri': base + POCKET_FINISH_REDIRECT
})
logging.debug(data)
res = urlfetch.fetch(
url=POCKET_OAUTH_REQUEST,
method=urlfetch.POST,
payload=data,
validate_certificate=True)
code = redirect = None
logging.debug(res.status_code)
if res.status_code == 200:
result = res.content
if 'code=' in result:
code = result.replace('code=','')
redirect = POCKET_AUTHORIZE_REDIR + '?request_token=%s&redirect_uri=%s' % (code, base + POCKET_FINISH_REDIRECT)
return (code, redirect)
def get_access_token(code):
'''
Get request token
'''
data = urllib.urlencode({
'consumer_key': POCKET_CONSUMER_KEY,
'code': code
})
logging.debug(data)
res = urlfetch.fetch(
url=POCKET_OAUTH_AUTHORIZE,
method=urlfetch.POST,
payload=data,
validate_certificate=True)
code = redirect = None
logging.debug(res.status_code)
if res.status_code == 200:
result = res.content
data = urlparse.parse_qs(result)
access_token = data.get('access_token', [None])[0]
return access_token
def _new_fixed_fetch(validate_certificate):
def fixed_fetch(url, payload=None, method="GET", headers={},
allow_truncated=False, follow_redirects=True,
deadline=None):
if deadline is None:
deadline = socket.getdefaulttimeout() or 5
return fetch(url, payload=payload, method=method, headers=headers,
allow_truncated=allow_truncated,
follow_redirects=follow_redirects, deadline=deadline,
validate_certificate=validate_certificate)
return fixed_fetch
def _new_fixed_fetch(validate_certificate):
def fixed_fetch(url, payload=None, method="GET", headers={},
allow_truncated=False, follow_redirects=True,
deadline=5):
return fetch(url, payload=payload, method=method, headers=headers,
allow_truncated=allow_truncated,
follow_redirects=follow_redirects, deadline=deadline,
validate_certificate=validate_certificate)
return fixed_fetch
def _new_fixed_fetch(validate_certificate):
def fixed_fetch(url, payload=None, method="GET", headers={},
allow_truncated=False, follow_redirects=True,
deadline=None):
if deadline is None:
deadline = socket.getdefaulttimeout()
return fetch(url, payload=payload, method=method, headers=headers,
allow_truncated=allow_truncated,
follow_redirects=follow_redirects, deadline=deadline,
validate_certificate=validate_certificate)
return fixed_fetch
def _new_fixed_fetch(validate_certificate):
def fixed_fetch(url, payload=None, method="GET", headers={},
allow_truncated=False, follow_redirects=True,
deadline=None):
if deadline is None:
deadline = socket.getdefaulttimeout()
return fetch(url, payload=payload, method=method, headers=headers,
allow_truncated=allow_truncated,
follow_redirects=follow_redirects, deadline=deadline,
validate_certificate=validate_certificate)
return fixed_fetch
def _new_fixed_fetch(validate_certificate):
def fixed_fetch(url, payload=None, method="GET", headers={},
allow_truncated=False, follow_redirects=True,
deadline=None):
if deadline is None:
deadline = socket.getdefaulttimeout() or 5
return fetch(url, payload=payload, method=method, headers=headers,
allow_truncated=allow_truncated,
follow_redirects=follow_redirects, deadline=deadline,
validate_certificate=validate_certificate)
return fixed_fetch
def request(self, method, url, body, headers):
# Calculate the absolute URI, which fetch requires
netloc = self.host
if self.port:
netloc = '%s:%s' % (self.host, self.port)
absolute_uri = '%s://%s%s' % (self.scheme, netloc, url)
try:
try: # 'body' can be a stream.
body = body.read()
except AttributeError:
pass
response = fetch(absolute_uri, payload=body, method=method,
headers=headers, allow_truncated=False, follow_redirects=False,
deadline=self.timeout,
validate_certificate=self.validate_certificate)
self.response = ResponseDict(response.headers)
self.response['status'] = str(response.status_code)
self.response['reason'] = httplib.responses.get(response.status_code, 'Ok')
self.response.status = response.status_code
setattr(self.response, 'read', lambda : response.content)
# Make sure the exceptions raised match the exceptions expected.
except InvalidURLError:
raise socket.gaierror('')
except (DownloadError, ResponseTooLargeError, SSLCertificateError):
raise httplib.HTTPException()
def fetch(url, data=None, headers=None,
cookie=Cookie.SimpleCookie(),
user_agent='Mozilla/5.0'):
headers = headers or {}
if data is not None:
data = urllib.urlencode(data)
if user_agent:
headers['User-agent'] = user_agent
headers['Cookie'] = ' '.join(
['%s=%s;' % (c.key, c.value) for c in cookie.values()])
try:
from google.appengine.api import urlfetch
except ImportError:
req = urllib2.Request(url, data, headers)
html = urllib2.urlopen(req).read()
else:
method = ((data is None) and urlfetch.GET) or urlfetch.POST
while url is not None:
response = urlfetch.fetch(url=url, payload=data,
method=method, headers=headers,
allow_truncated=False, follow_redirects=False,
deadline=10)
# next request will be a get, so no need to send the data again
data = None
method = urlfetch.GET
# load cookies from the response
cookie.load(response.headers.get('set-cookie', ''))
url = response.headers.get('location')
html = response.content
return html
def geocode(address):
try:
a = urllib_quote(address)
txt = fetch('http://maps.googleapis.com/maps/api/geocode/xml?sensor=false&address=%s' % a)
item = regex_geocode.search(txt)
(la, lo) = (float(item.group('la')), float(item.group('lo')))
return (la, lo)
except:
return (0.0, 0.0)
def reverse_geocode(lat, lng, lang=None):
""" Try to get an approximate address for a given latitude, longitude. """
if not lang:
lang = current.T.accepted_language
try:
return json.loads(fetch('http://maps.googleapis.com/maps/api/geocode/json?latlng=%(lat)s,%(lng)s&language=%(lang)s' % locals()))['results'][0]['formatted_address']
except:
return ''
def obtain_bearer_token(host, path):
"""Given a bearer token, send a GET request to the API.
Args:
host (str): The domain host of the API.
path (str): The path of the API after the domain.
params (dict): An optional set of query parameters in the request.
Returns:
str: OAuth bearer token, obtained using client_id and client_secret.
Raises:
HTTPError: An error occurs from the HTTP request.
"""
url = '{0}{1}'.format(host, quote(path.encode('utf8')))
data = urlencode({
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'grant_type': GRANT_TYPE,
})
print('@@@@@@@@@' + CLIENT_ID)
headers = {
'content-type': 'application/x-www-form-urlencoded',
}
result = urlfetch.fetch(
url=url,
payload=data,
method=urlfetch.POST,
headers=headers)
print('@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@' + result.content)
return "BIO6_LpbIcFkeKDB9SsSAONt3lE2IwrdiTxUeq-Ag1MKOzSc4m-8QyPjdV6WmI27ySuLEKv7czHoJmJjFHrCyjfgxucTvKPpJG9JCsg_08KCz4J-WrEfeaiACoJ2WXYx"
def request(host, path, bearer_token, params):
"""Given a bearer token, send a GET request to the API.
Args:
host (str): The domain host of the API.
path (str): The path of the API after the domain.
bearer_token (str): OAuth bearer token, obtained using client_id and client_secret.
params (dict): An optional set of query parameters in the request.
Returns:
dict: The JSON response from the request.
Raises:
HTTPError: An error occurs from the HTTP request.
"""
url = '{0}{1}?{2}'.format(
host,
quote(path.encode('utf8')),
urllib.urlencode(params))
headers = {
'Authorization': 'Bearer %s' % bearer_token,
}
logging.info(u'Querying {0} ...'.format(url))
result = urlfetch.fetch(
url=url,
method=urlfetch.GET,
headers=headers)
logging.info('@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@' + result.content)
return json.loads(result.content)
def _urlfetch_http_request(url, method, data):
from google.appengine.api import urlfetch
method = method.upper()
qs = urlencode(data)
if method == 'POST':
payload = qs
else:
payload = None
url += '?' + qs
headers = {
'User-Agent': 'Plaid Python v{}'.format(__version__)
}
res = urlfetch.fetch(
url,
follow_redirects=True,
method=method,
payload=payload,
headers=headers,
deadline=60 # seconds
)
# Add consistent interface across requests library and urlfetch
res.ok = res.status_code >= 200 and res.status_code < 300
res.text = res.content
return res
def fetch(url, data=None, headers=None,
cookie=Cookie.SimpleCookie(),
user_agent='Mozilla/5.0'):
headers = headers or {}
if not data is None:
data = urllib.urlencode(data)
if user_agent:
headers['User-agent'] = user_agent
headers['Cookie'] = ' '.join(
['%s=%s;' % (c.key, c.value) for c in cookie.values()])
try:
from google.appengine.api import urlfetch
except ImportError:
req = urllib2.Request(url, data, headers)
html = urllib2.urlopen(req).read()
else:
method = ((data is None) and urlfetch.GET) or urlfetch.POST
while url is not None:
response = urlfetch.fetch(url=url, payload=data,
method=method, headers=headers,
allow_truncated=False, follow_redirects=False,
deadline=10)
# next request will be a get, so no need to send the data again
data = None
method = urlfetch.GET
# load cookies from the response
cookie.load(response.headers.get('set-cookie', ''))
url = response.headers.get('location')
html = response.content
return html
def geocode(address):
try:
a = urllib.quote(address)
txt = fetch('http://maps.googleapis.com/maps/api/geocode/xml?sensor=false&address=%s' % a)
item = regex_geocode.search(txt)
(la, lo) = (float(item.group('la')), float(item.group('lo')))
return (la, lo)
except:
return (0.0, 0.0)
def reverse_geocode(lat, lng, lang=None):
""" Try to get an approximate address for a given latitude, longitude. """
if not lang:
lang = current.T.accepted_language
try:
return json_parser.loads(fetch('http://maps.googleapis.com/maps/api/geocode/json?latlng=%(lat)s,%(lng)s&language=%(lang)s' % locals()))['results'][0]['formatted_address']
except:
return ''
def request(self, resource, **kwargs):
"""
Convenience for making requests
Synchronous returns an object with status_code and content members.
FIXME this should really return JSON to match ASync
"""
request_args = self._create_request_args(**kwargs)
result = urlfetch.fetch(self._endpoint + resource, **request_args)
msg = "%s: %s%s %d (returned %d bytes)" % (
request_args["method"], self._endpoint, resource,
result.status_code, len(result.content))
if result.status_code == 200:
self.log.info(msg)
self.log.debug("Sent: %s", repr(request_args["headers"]))
self.log.debug("payload: %s", repr(request_args["payload"]))
self.log.debug("Got: %s", repr(result.content))
else:
self.log.warning(msg)
self.log.info("Sent %s", repr(request_args["headers"]))
self.log.debug("payload: %s", repr(request_args["payload"]))
self.log.info(result.content)
return result