def auth():
# Basic Authentication
requests.get('https://api.github.com/user', auth=HTTPBasicAuth('user', 'pass'))
requests.get('https://api.github.com/user', auth=('user', 'pass'))
# Digest Authentication
url = 'http://httpbin.org/digest-auth/auth/user/pass'
requests.get(url, auth=HTTPDigestAuth('user', 'pass'))
# OAuth2 Authentication????requests-oauthlib
url = 'https://api.twitter.com/1.1/account/verify_credentials.json'
auth = OAuth2('YOUR_APP_KEY', 'YOUR_APP_SECRET', 'USER_OAUTH_TOKEN')
requests.get(url, auth=auth)
pass
python类OAuth2()的实例源码
def __init__(self, client_id=None, client_secret=None, server_token=None):
self.client_id = client_id or UBER_CLIENT_ID
self.client_secret = client_secret or UBER_CLIENT_SECRET
self.server_token = server_token or UBER_SERVER_TOKEN
# self.oauth_client = OAuth2(client_id=self.client_id, token=self.server_token)
def query_GitHub(url, username=None, password=None, token=None, data=None,
OTP=None, headers=None, params=None, files=None):
"""
Query GitHub API.
In case of a multipage result, DOES NOT query the next page.
"""
headers = headers or {}
if OTP:
headers['X-GitHub-OTP'] = OTP
if token:
auth = OAuth2(client_id=username, token=dict(access_token=token,
token_type='bearer'))
else:
auth = HTTPBasicAuth(username, password)
if data:
r = requests.post(url, auth=auth, data=data, headers=headers,
params=params, files=files)
else:
r = requests.get(url, auth=auth, headers=headers, params=params, stream=True)
if r.status_code == 401:
two_factor = r.headers.get('X-GitHub-OTP')
if two_factor:
print("A two-factor authentication code is required:", two_factor.split(';')[1].strip())
OTP = raw_input("Authentication code: ")
return query_GitHub(url, username=username, password=password,
token=token, data=data, OTP=OTP)
raise AuthenticationFailed("invalid username or password")
r.raise_for_status()
return r
# ------------------------------------------------
# Vagrant related configuration
def SetCredentials(self,
consumer_key,
consumer_secret,
access_token_key=None,
access_token_secret=None,
application_only_auth=False):
"""Set the consumer_key and consumer_secret for this instance
Args:
consumer_key:
The consumer_key of the twitter account.
consumer_secret:
The consumer_secret for the twitter account.
access_token_key:
The oAuth access token key value you retrieved
from running get_access_token.py.
access_token_secret:
The oAuth access token's secret, also retrieved
from the get_access_token.py run.
application_only_auth:
Whether to generate a bearer token and use Application-Only Auth
"""
self._consumer_key = consumer_key
self._consumer_secret = consumer_secret
self._access_token_key = access_token_key
self._access_token_secret = access_token_secret
if application_only_auth:
self._bearer_token = self.GetAppOnlyAuthToken(consumer_key, consumer_secret)
self.__auth = OAuth2(token=self._bearer_token)
else:
auth_list = [consumer_key, consumer_secret,
access_token_key, access_token_secret]
if all(auth_list):
self.__auth = OAuth1(consumer_key, consumer_secret,
access_token_key, access_token_secret)
self._config = None
def make_request(self, url, data={}, method=None, **kwargs):
"""
Builds and makes the OAuth2 Request, catches errors
https://wiki.fitbit.com/display/API/API+Response+Format+And+Errors
"""
if not method:
method = 'POST' if data else 'GET'
try:
auth = OAuth2(client_id=self.client_id, token=self.token)
response = self._request(method, url, data=data, auth=auth, **kwargs)
except (HTTPUnauthorized, TokenExpiredError) as e:
self.refresh_token()
auth = OAuth2(client_id=self.client_id, token=self.token)
response = self._request(method, url, data=data, auth=auth, **kwargs)
# yet another token expiration check
# (the above try/except only applies if the expired token was obtained
# using the current instance of the class this is a a general case)
if response.status_code == 401:
d = json.loads(response.content.decode('utf8'))
try:
if(d['errors'][0]['errorType'] == 'expired_token' and
d['errors'][0]['message'].find('Access token expired:') == 0):
self.refresh_token()
auth = OAuth2(client_id=self.client_id, token=self.token)
response = self._request(method, url, data=data, auth=auth, **kwargs)
except:
pass
if response.status_code == 401:
raise HTTPUnauthorized(response)
elif response.status_code == 403:
raise HTTPForbidden(response)
elif response.status_code == 404:
raise HTTPNotFound(response)
elif response.status_code == 409:
raise HTTPConflict(response)
elif response.status_code == 429:
exc = HTTPTooManyRequests(response)
exc.retry_after_secs = int(response.headers['Retry-After'])
raise exc
elif response.status_code >= 500:
raise HTTPServerError(response)
elif response.status_code >= 400:
raise HTTPBadRequest(response)
return response
def make_request(self, url, data={}, method=None, **kwargs):
"""
Builds and makes the OAuth2 Request, catches errors
https://wiki.fitbit.com/display/API/API+Response+Format+And+Errors
"""
if not method:
method = 'POST' if data else 'GET'
try:
auth = OAuth2(client_id=self.client_id, token=self.token)
response = self._request(method, url, data=data, auth=auth, **kwargs)
except HTTPUnauthorized as e:
self.refresh_token()
auth = OAuth2(client_id=self.client_id, token=self.token)
response = self._request(method, url, data=data, auth=auth, **kwargs)
# yet another token expiration check
# (the above try/except only applies if the expired token was obtained
# using the current instance of the class this is a a general case)
if response.status_code == 401:
d = json.loads(response.content.decode('utf8'))
try:
if(d['errors'][0]['errorType'] == 'expired_token' and
d['errors'][0]['message'].find('Access token expired:') == 0):
self.refresh_token()
auth = OAuth2(client_id=self.client_id, token=self.token)
response = self._request(method, url, data=data, auth=auth, **kwargs)
except:
pass
if response.status_code == 401:
raise HTTPUnauthorized(response)
elif response.status_code == 403:
raise HTTPForbidden(response)
elif response.status_code == 404:
raise HTTPNotFound(response)
elif response.status_code == 409:
raise HTTPConflict(response)
elif response.status_code == 429:
exc = HTTPTooManyRequests(response)
exc.retry_after_secs = int(response.headers['Retry-After'])
raise exc
elif response.status_code >= 500:
raise HTTPServerError(response)
elif response.status_code >= 400:
raise HTTPBadRequest(response)
return response
def make_request(self, url, data={}, method=None, **kwargs):
"""
Builds and makes the OAuth2 Request, catches errors
https://wiki.fitbit.com/display/API/API+Response+Format+And+Errors
"""
if not method:
method = 'POST' if data else 'GET'
try:
auth = OAuth2(client_id=self.client_id, token=self.token)
response = self._request(method, url, data=data, auth=auth, **kwargs)
except TokenExpiredError as e:
self.refresh_token()
auth = OAuth2(client_id=self.client_id, token=self.token)
response = self._request(method, url, data=data, auth=auth, **kwargs)
#yet another token expiration check
#(the above try/except only applies if the expired token was obtained
#using the current instance of the class this is a a general case)
if response.status_code == 401:
d = json.loads(response.content.decode('utf8'))
try:
if(d['errors'][0]['errorType']=='oauth' and
d['errors'][0]['fieldName']=='access_token' and
d['errors'][0]['message'].find('Access token invalid or expired:')==0):
self.refresh_token()
auth = OAuth2(client_id=self.client_id, token=self.token)
response = self._request(method, url, data=data, auth=auth, **kwargs)
except:
pass
if response.status_code == 401:
raise HTTPUnauthorized(response)
elif response.status_code == 403:
raise HTTPForbidden(response)
elif response.status_code == 404:
raise HTTPNotFound(response)
elif response.status_code == 409:
raise HTTPConflict(response)
elif response.status_code == 429:
exc = HTTPTooManyRequests(response)
exc.retry_after_secs = int(response.headers['Retry-After'])
raise exc
elif response.status_code >= 500:
raise HTTPServerError(response)
elif response.status_code >= 400:
raise HTTPBadRequest(response)
return response