def getAddress(longitude, latitude):
gsp_key = "gps-" + str(longitude) + "," + str(latitude)
resultData = memcache.get(key=gsp_key)
if resultData == None:
url = "https://maps.googleapis.com/maps/api/geocode/json?language=ja&sensor=false&key=" + const.GOOGLE_API_KEY + "&latlng=" + str(
longitude) + "," + str(latitude)
logging.debug(url)
result = urlfetch.fetch(
url=url,
method=urlfetch.GET,
headers={
}
)
if result.status_code == 200:
logging.debug(result.content)
else:
logging.debug(result.content)
jsonstr = result.content
jsonobj = json.loads(jsonstr)
if len(jsonobj["results"]) > 0:
memcache.set(key=gsp_key, value=jsonobj, time=3600)
resultData = jsonobj;
else:
logging.debug(resultData)
return resultData["results"]
python类GET的实例源码
def getUserProfine(mid):
# midstr= ','.join(mids)
url = "https://api.line.me/v2/bot/profile/"+mid
result = urlfetch.fetch(
url=url,
method=urlfetch.GET,
headers={
'Authorization': 'Bearer '+const.ChannelAccessToken
}
)
if result.status_code == 200:
logging.debug(result.content)
else:
logging.debug(result.content)
jsonstr = result.content
jsonobj = json.loads(jsonstr)
return jsonobj
def get(self):
auth_token, _ = app_identity.get_access_token(
'https://www.googleapis.com/auth/cloud-platform')
logging.info(
'Using token {} to represent identity {}'.format(
auth_token, app_identity.get_service_account_name()))
response = urlfetch.fetch(
'https://www.googleapis.com/storage/v1/b?project={}'.format(
app_identity.get_application_id()),
method=urlfetch.GET,
headers={
'Authorization': 'Bearer {}'.format(auth_token)
}
)
if response.status_code != 200:
raise Exception(
'Call failed. Status code {}. Body {}'.format(
response.status_code, response.content))
result = json.loads(response.content)
self.response.headers['Content-Type'] = 'application/json'
self.response.write(json.dumps(result, indent=2))
def list_bucket_files(bucket_name, prefix, max_keys=1000):
"""Returns a listing of of a bucket that matches the given prefix."""
scope = config.GoogleApiScope('devstorage.read_only')
bucket_url = config.GsBucketURL(bucket_name)
url = bucket_url + '?'
query = [('max-keys', max_keys)]
if prefix:
query.append(('prefix', prefix))
url += urllib.urlencode(query)
auth_token, _ = app_identity.get_access_token(scope)
result = urlfetch.fetch(url, method=urlfetch.GET, headers={
'Authorization': 'OAuth %s' % auth_token,
'x-goog-api-version': '2'})
if result and result.status_code == 200:
doc = xml.dom.minidom.parseString(result.content)
return [node.childNodes[0].data for node in doc.getElementsByTagName('Key')]
raise BackupValidationError('Request to Google Cloud Storage failed')
def get_gs_object(bucket_name, path):
"""Returns a listing of of a bucket that matches the given prefix."""
scope = config.GoogleApiScope('devstorage.read_only')
bucket_url = config.GsBucketURL(bucket_name)
url = bucket_url + path
auth_token, _ = app_identity.get_access_token(scope)
result = urlfetch.fetch(url, method=urlfetch.GET, headers={
'Authorization': 'OAuth %s' % auth_token,
'x-goog-api-version': '2'})
if result and result.status_code == 200:
return result.content
if result and result.status_code == 403:
raise BackupValidationError(
'Requested path %s is not accessible/access denied' % url)
if result and result.status_code == 404:
raise BackupValidationError('Requested path %s was not found' % url)
raise BackupValidationError('Error encountered accessing requested path %s' %
url)
def authenticated_get(
url, customer_key=CUSTOMER_KEY, customer_secret=CUSTOMER_SECRET):
"""Performs an authenticated GET to the given URL, returns the response's
content.
See https://dev.twitter.com/oauth/application-only
"""
token = get_access_token()
response = urlfetch.fetch(
url,
method=urlfetch.GET,
headers={'Authorization': 'Bearer ' + token})
if response.status_code == urlfetch.httplib.OK:
return response.content
elif response.status_code == urlfetch.httplib.UNAUTHORIZED:
return response.content # User is probably suspended
else:
message = 'Url ' + url + ' returned ' + response.content
logging.warning(message)
raise urlfetch.httplib.HTTPException(message)
def urlread(url, data=None, headers=None):
if data is not None:
if headers is None:
headers = {"Content-type": "application/x-www-form-urlencoded"}
method = urlfetch.POST
else:
if headers is None:
headers = {}
method = urlfetch.GET
result = urlfetch.fetch(url, method=method,
payload=data, headers=headers)
if result.status_code == 200:
return result.content
else:
raise urllib2.URLError("fetch error url=%s, code=%d" % (url, result.status_code))
def fetch(url, data=None, headers=None,
cookie=Cookie.SimpleCookie(),
user_agent='Mozilla/5.0'):
headers = headers or {}
if data is not None:
data = urllib.urlencode(data)
if user_agent:
headers['User-agent'] = user_agent
headers['Cookie'] = ' '.join(
['%s=%s;' % (c.key, c.value) for c in cookie.values()])
try:
from google.appengine.api import urlfetch
except ImportError:
req = urllib2.Request(url, data, headers)
html = urllib2.urlopen(req).read()
else:
method = ((data is None) and urlfetch.GET) or urlfetch.POST
while url is not None:
response = urlfetch.fetch(url=url, payload=data,
method=method, headers=headers,
allow_truncated=False, follow_redirects=False,
deadline=10)
# next request will be a get, so no need to send the data again
data = None
method = urlfetch.GET
# load cookies from the response
cookie.load(response.headers.get('set-cookie', ''))
url = response.headers.get('location')
html = response.content
return html
def obtain_bearer_token(host, path):
"""Given a bearer token, send a GET request to the API.
Args:
host (str): The domain host of the API.
path (str): The path of the API after the domain.
params (dict): An optional set of query parameters in the request.
Returns:
str: OAuth bearer token, obtained using client_id and client_secret.
Raises:
HTTPError: An error occurs from the HTTP request.
"""
url = '{0}{1}'.format(host, quote(path.encode('utf8')))
data = urlencode({
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'grant_type': GRANT_TYPE,
})
print('@@@@@@@@@' + CLIENT_ID)
headers = {
'content-type': 'application/x-www-form-urlencoded',
}
result = urlfetch.fetch(
url=url,
payload=data,
method=urlfetch.POST,
headers=headers)
print('@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@' + result.content)
return "BIO6_LpbIcFkeKDB9SsSAONt3lE2IwrdiTxUeq-Ag1MKOzSc4m-8QyPjdV6WmI27ySuLEKv7czHoJmJjFHrCyjfgxucTvKPpJG9JCsg_08KCz4J-WrEfeaiACoJ2WXYx"
def request(host, path, bearer_token, params):
"""Given a bearer token, send a GET request to the API.
Args:
host (str): The domain host of the API.
path (str): The path of the API after the domain.
bearer_token (str): OAuth bearer token, obtained using client_id and client_secret.
params (dict): An optional set of query parameters in the request.
Returns:
dict: The JSON response from the request.
Raises:
HTTPError: An error occurs from the HTTP request.
"""
url = '{0}{1}?{2}'.format(
host,
quote(path.encode('utf8')),
urllib.urlencode(params))
headers = {
'Authorization': 'Bearer %s' % bearer_token,
}
logging.info(u'Querying {0} ...'.format(url))
result = urlfetch.fetch(
url=url,
method=urlfetch.GET,
headers=headers)
logging.info('@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@' + result.content)
return json.loads(result.content)
def fetch(url, data=None, headers=None,
cookie=Cookie.SimpleCookie(),
user_agent='Mozilla/5.0'):
headers = headers or {}
if not data is None:
data = urllib.urlencode(data)
if user_agent:
headers['User-agent'] = user_agent
headers['Cookie'] = ' '.join(
['%s=%s;' % (c.key, c.value) for c in cookie.values()])
try:
from google.appengine.api import urlfetch
except ImportError:
req = urllib2.Request(url, data, headers)
html = urllib2.urlopen(req).read()
else:
method = ((data is None) and urlfetch.GET) or urlfetch.POST
while url is not None:
response = urlfetch.fetch(url=url, payload=data,
method=method, headers=headers,
allow_truncated=False, follow_redirects=False,
deadline=10)
# next request will be a get, so no need to send the data again
data = None
method = urlfetch.GET
# load cookies from the response
cookie.load(response.headers.get('set-cookie', ''))
url = response.headers.get('location')
html = response.content
return html
def fetch(url, data=None, headers=None,
cookie=Cookie.SimpleCookie(),
user_agent='Mozilla/5.0'):
headers = headers or {}
if data is not None:
data = urllib.urlencode(data)
if user_agent:
headers['User-agent'] = user_agent
headers['Cookie'] = ' '.join(
['%s=%s;' % (c.key, c.value) for c in cookie.values()])
try:
from google.appengine.api import urlfetch
except ImportError:
req = urllib2.Request(url, data, headers)
html = urllib2.urlopen(req).read()
else:
method = ((data is None) and urlfetch.GET) or urlfetch.POST
while url is not None:
response = urlfetch.fetch(url=url, payload=data,
method=method, headers=headers,
allow_truncated=False, follow_redirects=False,
deadline=10)
# next request will be a get, so no need to send the data again
data = None
method = urlfetch.GET
# load cookies from the response
cookie.load(response.headers.get('set-cookie', ''))
url = response.headers.get('location')
html = response.content
return html
def getImageDataExternal(self, width, heigh):
logging.debug(self.request.path)
picture_key = self.request.path[11:75]
encodeUrl = self.request.path[76:]
originalUrl = urllib.unquote(encodeUrl)
logging.debug(picture_key + " " + encodeUrl + " " + originalUrl)
if originalUrl == None:
logging.debug("URL Failure(" + picture_key + ")")
self.response.status = 301
self.response.headers['Location'] = "https://ifttt-line.appspot.com/images/preview_image.jpg?" + str(
uuid.uuid4())
elif picture_key == getHash(originalUrl):
logging.debug("Key is correct! " + picture_key + ")")
result = urlfetch.fetch(
url=originalUrl,
method=urlfetch.GET,
headers={
}
)
if result.status_code == 200:
logging.debug(result.content)
photo_data = result.content
thumb = image_Transform(photo_data, width, heigh)
contentLegth = len(thumb)
self.response.headers['Content-Type'] = result.headers['Content-Type']
if result.headers.has_key("content-disposition"):
self.response.headers['content-disposition'] = result.headers['content-disposition']
self.response.headers['date'] = result.headers['date']
self.response.headers['content-length'] = contentLegth
self.response.out.write(thumb)
else:
self.response.status = 301
self.response.headers['Location'] = "https://ifttt-line.appspot.com/images/preview_image.jpg?" + str(
uuid.uuid4())
logging.debug("Image Load Failure(" + originalUrl + ")")
else:
logging.debug("Key Failure(" + picture_key + ")")
self.response.status = 301
self.response.headers['Location'] = "https://ifttt-line.appspot.com/images/preview_image.jpg?" + str(
uuid.uuid4())
def returnContent(self):
logging.debug(self.request.path)
picture_key= self.request.path[9:73]
originalid= self.request.path[74:]
logging.debug(picture_key+" "+ " "+originalid)
if originalid == None:
logging.debug("URL Failure("+picture_key+")")
self.response.status = 301
self.response.headers['Location'] ="https://ifttt-line.appspot.com/images/preview_image.jpg?"+str(uuid.uuid4())
elif picture_key == utility.getHash(originalid):
url = "https://api.line.me/v2/bot/message/"+originalid+"/content"
result = urlfetch.fetch(
url=url,
method=urlfetch.GET,
headers={
'Authorization': 'Bearer '+const.ChannelAccessToken
}
)
logging.debug(result.headers)
if result.status_code == 200:
logging.debug(result.content)
self.response.headers['Content-Type'] = result.headers['Content-Type']
if result.headers.has_key("content-disposition"):
self.response.headers['content-disposition'] = result.headers['content-disposition']
self.response.headers['date'] = result.headers['date']
self.response.headers['content-length'] = result.headers['content-length']
self.response.out.write(result.content)
else:
logging.debug("Content Load Error")
logging.debug(result.content)
self.response.status = 301
self.response.headers['Location'] ="https://ifttt-line.appspot.com/images/preview_image.jpg?"+str(uuid.uuid4())
else:
logging.debug("Key Failure")
def fetch(url, data=None, headers=None,
cookie=Cookie.SimpleCookie(),
user_agent='Mozilla/5.0'):
headers = headers or {}
if data is not None:
data = urllib.urlencode(data)
if user_agent:
headers['User-agent'] = user_agent
headers['Cookie'] = ' '.join(
['%s=%s;' % (c.key, c.value) for c in cookie.values()])
try:
from google.appengine.api import urlfetch
except ImportError:
req = urllib2.Request(url, data, headers)
html = urllib2.urlopen(req).read()
else:
method = ((data is None) and urlfetch.GET) or urlfetch.POST
while url is not None:
response = urlfetch.fetch(url=url, payload=data,
method=method, headers=headers,
allow_truncated=False, follow_redirects=False,
deadline=10)
# next request will be a get, so no need to send the data again
data = None
method = urlfetch.GET
# load cookies from the response
cookie.load(response.headers.get('set-cookie', ''))
url = response.headers.get('location')
html = response.content
return html
def fetch(url, data=None, headers=None,
cookie=Cookie.SimpleCookie(),
user_agent='Mozilla/5.0'):
headers = headers or {}
if not data is None:
data = urllib.urlencode(data)
if user_agent:
headers['User-agent'] = user_agent
headers['Cookie'] = ' '.join(
['%s=%s;' % (c.key, c.value) for c in cookie.values()])
try:
from google.appengine.api import urlfetch
except ImportError:
req = urllib2.Request(url, data, headers)
html = urllib2.urlopen(req).read()
else:
method = ((data is None) and urlfetch.GET) or urlfetch.POST
while url is not None:
response = urlfetch.fetch(url=url, payload=data,
method=method, headers=headers,
allow_truncated=False, follow_redirects=False,
deadline=10)
# next request will be a get, so no need to send the data again
data = None
method = urlfetch.GET
# load cookies from the response
cookie.load(response.headers.get('set-cookie', ''))
url = response.headers.get('location')
html = response.content
return html
def __init__(self, host, port=None, strict=False, timeout=None):
from google.appengine.api import urlfetch
self._fetch = urlfetch.fetch
self._method_map = {
'GET': urlfetch.GET,
'POST': urlfetch.POST,
'HEAD': urlfetch.HEAD,
'PUT': urlfetch.PUT,
'DELETE': urlfetch.DELETE,
'PATCH': urlfetch.PATCH,
}
self.host = host
self.port = port
self._method = self._url = None
self._body = ''
self.headers = []
if not isinstance(timeout, (float, int, long)):
timeout = None
self.timeout = timeout
def _get_html_page(self):
"""Return the name of the HTML page for HTTP/GET requests."""
raise NotImplementedError
app_identity_defaultcredentialsbased_stub.py 文件源码
项目:Deploy_XXNET_Server
作者: jzp820927
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def _PopulateX509(self):
with self._x509_init_lock:
if self._x509 is None:
url = ('https://www.googleapis.com/service_accounts/v1/metadata/x509/%s'
% urllib.unquote_plus(self._credentials.service_account_email))
response = urlfetch.fetch(
url=url,
validate_certificate=True,
method=urlfetch.GET)
if response.status_code != 200:
raise apiproxy_errors.ApplicationError(
app_identity_service_pb.AppIdentityServiceError.UNKNOWN_ERROR,
'Unable to load X509 cert: %s Response code: %i, Content: %s' % (
url, response.status_code, response.content))
message = 'dummy'
_, signature = self._credentials.sign_blob(message)
for signing_key, x509 in json.loads(response.content).items():
der = rsa.pem.load_pem(x509, 'CERTIFICATE')
asn1_cert, _ = decoder.decode(der, asn1Spec=Certificate())
key_bitstring = (
asn1_cert['tbsCertificate']
['subjectPublicKeyInfo']
['subjectPublicKey'])
key_bytearray = BitStringToByteString(key_bitstring)
public_key = rsa.PublicKey.load_pkcs1(key_bytearray, 'DER')
try:
if rsa.pkcs1.verify(message, signature, public_key):
self._x509 = x509
self._signing_key = signing_key
return
except rsa.pkcs1.VerificationError:
pass
raise apiproxy_errors.ApplicationError(
app_identity_service_pb.AppIdentityServiceError.UNKNOWN_ERROR,
'Unable to find matching X509 cert for private key: %s' % url)
def __init__(self, host, port=None, strict=None,
timeout=_GLOBAL_DEFAULT_TIMEOUT, source_address=None,
context=None):
# net.proto.ProcotolBuffer relies on httplib so importing urlfetch at the
# module level causes a failure on prod. That means the import needs to be
# lazy.
from google.appengine.api import urlfetch
self._fetch = urlfetch.fetch
self._method_map = {
'GET': urlfetch.GET,
'POST': urlfetch.POST,
'HEAD': urlfetch.HEAD,
'PUT': urlfetch.PUT,
'DELETE': urlfetch.DELETE,
'PATCH': urlfetch.PATCH,
}
self.host = host
self.port = port
# With urllib2 in Python 2.6, an object can be passed here.
# The default is set to socket.GLOBAL_DEFAULT_TIMEOUT which is an object.
# We only accept float, int or long values, otherwise it can be
# silently ignored.
if not isinstance(timeout, (float, int, long)):
timeout = None
self.timeout = timeout
# Both 'strict' and 'source_address' are ignored.
self._method = self._url = None
self._body = ''
self.headers = []
def putrequest(self, method, url, skip_host=0, skip_accept_encoding=0):
"""Send a request to the server.
`method' specifies an HTTP request method, e.g. 'GET'.
`url' specifies the object being requested, e.g. '/index.html'.
`skip_host' if True does not add automatically a 'Host:' header
`skip_accept_encoding' if True does not add automatically an
'Accept-Encoding:' header
App Engine Note: `skip_host' and `skip_accept_encoding' are not honored by
the urlfetch service.
"""
self._method = method
self._url = url
def fetch(self):
from secrets import NYT_API_KEY
params = urllib.urlencode({
'api-key': NYT_API_KEY,
'fq': "section_name:(\"World\" \"U.S.\")",
'begin_date': datetime.strftime(self.date_dt, "%Y%m%d"),
'end_date': datetime.strftime(self.next_date_dt, "%Y%m%d")
})
url = "https://api.nytimes.com/svc/search/v2/articlesearch.json?" + params
logging.debug(url)
response = urlfetch.fetch(url, method=urlfetch.GET)
items = []
IMAGE_BASE = "http://www.nytimes.com/"
MIN_WIDTH = 300
if response.status_code == 200:
j_response = json.loads(response.content)
results = j_response.get('response', {}).get('docs', [])
for news in results:
cat = news.get('news_desk')
multimedia = news.get('multimedia')
image = None
for mm in multimedia:
w = mm.get('width')
if w > MIN_WIDTH:
image = IMAGE_BASE + mm.get('url')
item = Item(
svc=SERVICE.NYT_NEWS,
title=news.get(
'headline',
{}).get('main'),
image=image,
details=news.get('snippet'),
link=news.get('web_url'),
id=news.get('_id'),
type=SERVICE.NEWS)
items.append(item.json())
return items
def get_routes( cls ):
routes = [ webapp2.Route( cls.AUTHREQUEST, handler = 'enki.handlersoauth.HandlerOAuthGoogle:auth_request', methods = [ 'GET' ] ),
webapp2.Route( cls.AUTHCALLBACK, handler = 'enki.handlersoauth.HandlerOAuthGoogle:auth_callback', methods = [ 'GET' ] ), ]
return routes
def get_routes( cls ):
routes = [ webapp2.Route( cls.AUTHREQUEST, handler = 'enki.handlersoauth.HandlerOAuthFacebook:auth_request', methods = [ 'GET' ] ),
webapp2.Route( cls.AUTHCALLBACK, handler = 'enki.handlersoauth.HandlerOAuthFacebook:auth_callback', methods = [ 'GET' ] ), ]
return routes
def get_routes( cls ):
routes = [ webapp2.Route( cls.AUTHREQUEST, handler = 'enki.handlersoauth.HandlerOAuthGithub:auth_request', methods = [ 'GET' ] ),
webapp2.Route( cls.AUTHCALLBACK, handler = 'enki.handlersoauth.HandlerOAuthGithub:auth_callback', methods = [ 'GET' ] ), ]
return routes
def get_routes( cls ):
routes = [ webapp2.Route( cls.AUTHREQUEST, handler = 'enki.handlersoauth.HandlerOAuthSteam:auth_request', methods = [ 'GET' ] ),
webapp2.Route( cls.AUTHCALLBACK, handler = 'enki.handlersoauth.HandlerOAuthSteam:auth_callback', methods = [ 'GET' ] ), ]
return routes
def get_routes( cls ):
routes = [ webapp2.Route( cls.AUTHREQUEST, handler = 'enki.handlersoauth.HandlerOAuthTwitter:auth_request', methods = [ 'GET' ] ),
webapp2.Route( cls.AUTHCALLBACK, handler = 'enki.handlersoauth.HandlerOAuthTwitter:auth_callback', methods = [ 'GET' ] ), ]
return routes
def get_profile(screen_name='', twitter_id=''):
"""Returns a JSON text from the Twitter GET users/show API.
See https://dev.twitter.com/rest/reference/get/users/show"""
if screen_name != '':
profile = get_profile_by_screen_name(screen_name)
elif twitter_id != '':
profile = get_profile_by_twitter_id(twitter_id)
return profile
def get_profile_by_screen_name(screen_name):
"""Returns a JSON text from the Twitter GET users/show API.
See https://dev.twitter.com/rest/reference/get/users/show"""
return authenticated_get(
'https://api.twitter.com/1.1/users/show.json?screen_name={}'.format(
screen_name))
def get_profile_by_twitter_id(twitter_id):
"""Returns a JSON text from the Twitter GET users/show API.
See https://dev.twitter.com/rest/reference/get/users/show"""
return authenticated_get(
'https://api.twitter.com/1.1/users/show.json?id={}'.format(
twitter_id))