def __init__(
self,
credFile=None,
client_key=None,
proxy_url=None):
"""Initialize with your MuseScore application credentials"""
self.proxies = {'https': proxy_url} if proxy_url else None
auth_type='oAuth1'
if credFile and os.path.isfile(credFile):
with open("credentials.json") as json_file:
cred = json.load(json_file)
self.auth = OAuth1(cred["client_key"],
client_secret=cred["client_secret"],
resource_owner_key=cred["resource_owner_key"],
resource_owner_secret=cred["resource_owner_secret"])
elif client_key:
self.auth = None
self.client_key = client_key
else:
raise Exception('At least a client key is needed')
python类OAuth1()的实例源码
MuseScoreAPI.py 文件源码
项目:epfl-semester-project-biaxialnn
作者: onanypoint
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def __init__(self, api_key, api_secret, oauth_token=None, default_timeout=None):
self.log = logging.getLogger('%s.%s' % (self.__class__.__module__, self.__class__.__name__))
assert isinstance(api_key, six.text_type), 'api_key must be unicode string'
assert isinstance(api_secret, six.text_type), 'api_secret must be unicode string'
token = None
secret = None
if oauth_token.token:
token = oauth_token.token.token
secret = oauth_token.token.token_secret
self.oauth = OAuth1(api_key, api_secret, token, secret, signature_type='auth_header')
self.oauth_token = oauth_token
self.auth_http_server = None
self.requested_permissions = None
self.default_timeout = default_timeout
def verify_oauth(access_token,access_token_secret):
# consumer key and secret. these keys are generated when creating
# our twitter app on the twitter developer portal. They are needed
# when verifing twitter credentials
consumer_key='kbwf3Qbb6MfjXiPirco5j3UYe'
consumer_secret='1Lr0HOTxQBLU22FsH2CH9RFBtnQCHwjAsidZrctvYoy9yIDX7n'
# url of twitter api that verifys a users credentials
url = 'https://api.twitter.com/1.1/account/verify_credentials.json'
# oAuth1 method that creates an authorization object to hand to
# twitters api
auth = OAuth1(consumer_key, consumer_secret, access_token, access_token_secret)
# gets request from twitter api
# returns json result from request
return requests.get(url, auth=auth)
# main method
def verify_oauth(access_token,access_token_secret):
# consumer key and secret. these keys are generated when creating
# our twitter app on the twitter developer portal. They are needed
# when verifing twitter credentials
consumer_key='kbwf3Qbb6MfjXiPirco5j3UYe'
consumer_secret='1Lr0HOTxQBLU22FsH2CH9RFBtnQCHwjAsidZrctvYoy9yIDX7n'
# url of twitter api that verifys a users credentials
url = 'https://api.twitter.com/1.1/account/verify_credentials.json'
# oAuth1 method that creates an authorization object to hand to
# twitters api
auth = OAuth1(consumer_key, consumer_secret, access_token, access_token_secret)
# gets request from twitter api
# returns json result from request
return requests.get(url, auth=auth)
# main method
def post_outcome_request(self, **kwargs):
'''
POST an OAuth signed request to the Tool Consumer.
'''
if not self.has_required_attributes():
raise InvalidLTIConfigError(
'OutcomeRequest does not have all required attributes')
header_oauth = OAuth1(self.consumer_key, self.consumer_secret,
signature_type=SIGNATURE_TYPE_AUTH_HEADER,
force_include_body=True, **kwargs)
headers = {'Content-type': 'application/xml'}
resp = requests.post(self.lis_outcome_service_url, auth=header_oauth,
data=self.generate_request_xml(),
headers=headers)
outcome_resp = OutcomeResponse.from_post_response(resp, resp.content)
self.outcome_response = outcome_resp
return self.outcome_response
def generate_launch_request(self, **kwargs):
"""
returns a Oauth v1 "signed" requests.PreparedRequest instance
"""
if not self.has_required_params():
raise InvalidLTIConfigError(
'Consumer\'s launch params missing one of '
+ str(LAUNCH_PARAMS_REQUIRED)
)
params = self.to_params()
r = Request('POST', self.launch_url, data=params).prepare()
sign = OAuth1(self.consumer_key, self.consumer_secret,
signature_type=SIGNATURE_TYPE_BODY, **kwargs)
return sign(r)
def _get_request_token(self):
"""
Obtain a temporary request token to authorize an access token and to
sign the request to obtain the access token
"""
if self.request_token is None:
get_params = {}
if self.parameters:
get_params.update(self.parameters)
get_params['oauth_callback'] \
= self.request.build_absolute_uri(self.callback_url)
rt_url = self.request_token_url + '?' + urlencode(get_params)
oauth = OAuth1(self.consumer_key,
client_secret=self.consumer_secret)
response = requests.post(url=rt_url, auth=oauth)
if response.status_code not in [200, 201]:
raise OAuthError(
_('Invalid response while obtaining request token'
' from "%s".') % get_token_prefix(
self.request_token_url))
self.request_token = dict(parse_qsl(response.text))
self.request.session['oauth_%s_request_token' % get_token_prefix(
self.request_token_url)] = self.request_token
return self.request_token
def query(self, url, method="GET", params=dict(), headers=dict()):
"""
Request a API endpoint at ``url`` with ``params`` being either the
POST or GET data.
"""
access_token = self._get_at_from_session()
oauth = OAuth1(
self.consumer_key,
client_secret=self.secret_key,
resource_owner_key=access_token['oauth_token'],
resource_owner_secret=access_token['oauth_token_secret'])
response = getattr(requests, method.lower())(url,
auth=oauth,
headers=headers,
params=params)
if response.status_code != 200:
raise OAuthError(
_('No access to private resources at "%s".')
% get_token_prefix(self.request_token_url))
return response.text
def make_request(self, method, url, data=None, params=None, headers=None,
timeout=60):
if headers is None:
headers = {'x-li-format': 'json', 'Content-Type': 'application/json'}
else:
headers.update({'x-li-format': 'json', 'Content-Type': 'application/json'})
if params is None:
params = {}
kw = dict(data=data, params=params,
headers=headers, timeout=timeout)
if isinstance(self.authentication, LinkedInDeveloperAuthentication):
# Let requests_oauthlib.OAuth1 do *all* of the work here
auth = OAuth1(self.authentication.consumer_key, self.authentication.consumer_secret,
self.authentication.user_token, self.authentication.user_secret)
kw.update({'auth': auth})
else:
params.update({'oauth2_access_token': self.authentication.token.access_token})
return requests.request(method.upper(), url, **kw)
def get_user_info(data) -> object:
oauth_token = data.get('oauth_token')
oauth_token_secret = data.get('oauth_token_secret')
userinfo = {}
userinfo['oauth_token'] = oauth_token
userinfo['oauth_token_secret'] = oauth_token_secret
url = "http://api.openstreetmap.org/api/0.6/user/details"
auth = OAuth1(BaseConfig.OSM_CONSUMER_KEY, BaseConfig.OSM_CONSUMER_SECRET, oauth_token, oauth_token_secret)
resp = requests.get(url, auth=auth)
tree = ElementTree.fromstring(resp.content)
for child in tree:
if child.tag == 'user':
userinfo['display_name'] = child.attrib.get('display_name')
userinfo['id'] = child.attrib.get('id')
for innerChild in child:
if innerChild.tag == 'img':
userinfo['img'] = innerChild.attrib.get('href')
return userinfo
def clean_oauth_auth(self, access_token):
"""Override of oauth_auth since Xing doesn't like callback_uri
and oauth_verifier on authenticated API calls"""
key, secret = self.get_key_and_secret()
resource_owner_key = access_token.get('oauth_token')
resource_owner_secret = access_token.get('oauth_token_secret')
if not resource_owner_key:
raise AuthTokenError(self, 'Missing oauth_token')
if not resource_owner_secret:
raise AuthTokenError(self, 'Missing oauth_token_secret')
# decoding='utf-8' produces errors with python-requests on Python3
# since the final URL will be of type bytes
decoding = None if six.PY3 else 'utf-8'
return OAuth1(key, secret,
resource_owner_key=resource_owner_key,
resource_owner_secret=resource_owner_secret,
signature_type=SIGNATURE_TYPE_AUTH_HEADER,
decoding=decoding)
def unauthorized_token(self):
"""Return request for unauthorized token (first stage)"""
params = self.request_token_extra_arguments()
params.update(self.get_scope_argument())
key, secret = self.get_key_and_secret()
# decoding='utf-8' produces errors with python-requests on Python3
# since the final URL will be of type bytes
decoding = None if six.PY3 else 'utf-8'
state = self.get_or_create_state()
response = self.request(
self.REQUEST_TOKEN_URL,
params=params,
auth=OAuth1(key, secret, callback_uri=self.get_redirect_uri(state),
decoding=decoding),
method=self.REQUEST_TOKEN_METHOD
)
content = response.content
if response.encoding or response.apparent_encoding:
content = content.decode(response.encoding or
response.apparent_encoding)
else:
content = response.content.decode()
return content
def oauth_auth(self, token=None, oauth_verifier=None,
signature_type=SIGNATURE_TYPE_AUTH_HEADER):
key, secret = self.get_key_and_secret()
oauth_verifier = oauth_verifier or self.data.get('oauth_verifier')
if token:
resource_owner_key = token.get('oauth_token')
resource_owner_secret = token.get('oauth_token_secret')
if not resource_owner_key:
raise AuthTokenError(self, 'Missing oauth_token')
if not resource_owner_secret:
raise AuthTokenError(self, 'Missing oauth_token_secret')
else:
resource_owner_key = None
resource_owner_secret = None
# decoding='utf-8' produces errors with python-requests on Python3
# since the final URL will be of type bytes
decoding = None if six.PY3 else 'utf-8'
state = self.get_or_create_state()
return OAuth1(key, secret,
resource_owner_key=resource_owner_key,
resource_owner_secret=resource_owner_secret,
callback_uri=self.get_redirect_uri(state),
verifier=oauth_verifier,
signature_type=signature_type,
decoding=decoding)
def unauthorized_token_request(self):
"""Return request for unauthorized token (first stage)"""
params = self.request_token_extra_arguments()
params.update(self.get_scope_argument())
key, secret = self.get_key_and_secret()
# decoding='utf-8' produces errors with python-requests on Python3
# since the final URL will be of type bytes
decoding = None if six.PY3 else 'utf-8'
state = self.get_or_create_state()
auth = OAuth1(
key,
secret,
callback_uri=self.get_redirect_uri(state),
decoding=decoding,
signature_method=SIGNATURE_HMAC,
signature_type=SIGNATURE_TYPE_QUERY
)
url = self.REQUEST_TOKEN_URL + '?' + urlencode(params)
url, _, _ = auth.client.sign(url)
return url
def oauth_auth(self, token=None, oauth_verifier=None,
signature_type=SIGNATURE_TYPE_AUTH_HEADER):
key, secret = self.get_key_and_secret()
oauth_verifier = oauth_verifier or self.data.get('oauth_verifier')
token = token or {}
# decoding='utf-8' produces errors with python-requests on Python3
# since the final URL will be of type bytes
decoding = None if six.PY3 else 'utf-8'
state = self.get_or_create_state()
return OAuth1(key, secret,
resource_owner_key=None,
resource_owner_secret=None,
callback_uri=self.get_redirect_uri(state),
verifier=oauth_verifier,
signature_type=signature_type,
decoding=decoding)
def _get_request_token(self):
"""
Obtain a temporary request token to authorize an access token and to
sign the request to obtain the access token
"""
if self.request_token is None:
get_params = {}
if self.parameters:
get_params.update(self.parameters)
get_params['oauth_callback'] \
= self.request.build_absolute_uri(self.callback_url)
rt_url = self.request_token_url + '?' + urlencode(get_params)
oauth = OAuth1(self.consumer_key,
client_secret=self.consumer_secret)
response = requests.post(url=rt_url, auth=oauth)
if response.status_code not in [200, 201]:
raise OAuthError(
_('Invalid response while obtaining request token from "%s".') % get_token_prefix(self.request_token_url))
self.request_token = dict(parse_qsl(response.text))
self.request.session['oauth_%s_request_token' % get_token_prefix(self.request_token_url)] = self.request_token
return self.request_token
def query(self, url, method="GET", params=dict(), headers=dict()):
"""
Request a API endpoint at ``url`` with ``params`` being either the
POST or GET data.
"""
access_token = self._get_at_from_session()
oauth = OAuth1(
self.consumer_key,
client_secret=self.secret_key,
resource_owner_key=access_token['oauth_token'],
resource_owner_secret=access_token['oauth_token_secret'])
response = getattr(requests, method.lower())(url,
auth=oauth,
headers=headers,
params=params)
if response.status_code != 200:
raise OAuthError(
_('No access to private resources at "%s".')
% get_token_prefix(self.request_token_url))
return response.text
def lambda_handler(event, context):
# If being invoked by a dynamodb trigger
if 'Records' in event:
tweeters = dynamo_triggered_new_users(event)
status = new_user_status
else: # If being invoked by the cron we scan the table
tweeters = token_table.scan()['Items']
for tweeter in tweeters:
auth = OAuth1(
creds['CONSUMER_KEY'],
creds['CONSUMER_SECRET'],
tweeter['oauth_token'],
tweeter['oauth_token_secret']
)
resp = requests.post(
"https://api.twitter.com/1.1/statuses/update.json",
data={'status': status},
auth=auth
)
if status == 200:
print("Tweeted from " + tweeter['screen_name'])
else:
print(resp.text)
def bind_auth(self, **kwargs):
kwargs['auth'] = OAuth1(
six.text_type(settings.BITBUCKET_CONSUMER_KEY),
six.text_type(settings.BITBUCKET_CONSUMER_SECRET),
self.auth.tokens['oauth_token'],
self.auth.tokens['oauth_token_secret'],
signature_type='auth_header'
)
return kwargs
def __init__(self,
longitude=-0.418,
latitude=39.360,
units=weatherservice.Units()):
WeatherService.__init__(self, longitude, latitude, units)
self.oauth = OAuth1(API_KEY, SHARED_SECRET)
self.woeid = geocodeapi.get_woeid(latitude, longitude)
def do_search(term='Food', location='San Francisco'):
base_url = 'https://api.yelp.com/v2/search'
term = term.replace(' ', '+')
location = location.replace(' ', '+')
url = "{base_url}?term={term}&location={location}".format(base_url=base_url,
term=term,
location=location)
auth = OAuth1(consumer_key,
consumer_secret,
token,
token_secret)
r = requests.get(url, auth=auth)
return r.json(), r.text
def __init__(self, consumer_key=None, consumer_secret=None, oauth_token=None, oauth_secret=None):
self.consumer_key = consumer_key or TWITTER_CONSUMER_KEY
self.consumer_secret = consumer_secret or TWITTER_CONSUMER_SECRET
self.oauth_token = oauth_token or OAUTH_TOKEN
self.oauth_token_secret = oauth_secret or OAUTH_TOKEN_SECRET
self.auth=OAuth1(self.consumer_key,
client_secret=self.consumer_secret,
resource_owner_key=self.oauth_token,
resource_owner_secret=self.oauth_token_secret)
def requestQBO(method, url, context, payload=None):
if settings.oauth_flag == 2:
headers = {'Accept': 'application/json', 'User-Agent': 'PythonSampleApp2.0', 'Authorization': 'Bearer '+settings.access_token_oauth2}
req = requests.request(method,url, headers=headers, json=payload)
else:
headers = {'Accept': 'application/json', 'content-type': 'application/json; charset=utf-8', 'User-Agent': 'PythonSampleApp2.0'}
auth=OAuth1(context.consumerToken, context.consumerSecret,
context.accessToken, context.accessSecret)
req = requests.request(method,url, auth=auth, headers=headers, json=payload)
return req
def get_oauth():
client_key = get_client_key(request)
client_secret = Config().oauth_secret
if not client_key or not client_secret:
return None
return OAuth1(client_key=client_key,
client_secret=client_secret)
def auth():
return OAuth1(settings.API_500PX_KEY, client_secret=settings.API_500PX_SECRET)
def register_proxy(self, tool_profile):
register_url = self.find_registration_url()
r = Request("POST", register_url, data=json.dumps(tool_profile, indent=4), headers={'Content-Type':'application/vnd.ims.lti.v2.toolproxy+json'}).prepare()
sign = OAuth1(self.launch_params['reg_key'], self.launch_params['reg_password'],
signature_type=SIGNATURE_TYPE_AUTH_HEADER, force_include_body=True)
signed = sign(r)
return signed
def edit_wiki_page(page_name, content, summary=None):
access_token = flask.session.get('access_token', None)
auth = OAuth1(
app.config['CONSUMER_KEY'],
app.config['CONSUMER_SECRET'],
access_token['key'],
access_token['secret'])
# Get token
r = requests.get('https://en.wikipedia.org/w/api.php', params={
'action':'query',
'meta':'tokens',
'format': 'json',
}, auth=auth)
r.raise_for_status()
token = r.json()['query']['tokens']['csrftoken']
r = requests.post('https://en.wikipedia.org/w/api.php', data={
'action':'edit',
'title': page_name,
'text': content,
'summary': summary,
'format': 'json',
'token': token,
'watchlist': 'nochange',
}, auth=auth)
r.raise_for_status()
def create_oauth1(consumer_key, consumer_secret, private_key, passphrase):
from Crypto.PublicKey import RSA
from requests_oauthlib import OAuth1
with open(private_key, 'rb') as fd:
rsa_key = RSA.importKey(fd.read(), passphrase)
return OAuth1(client_key=consumer_key, client_secret=consumer_secret,
signature_method='RSA-SHA1', rsa_key=rsa_key)
def get_oauth1session(consumer_key, consumer_secret, private_key, passphrase):
from Crypto.PublicKey import RSA
from requests_oauthlib import OAuth1
with open(private_key, 'r') as fd:
rsa_key = RSA.importKey(fd.read(), passphrase)
return OAuth1(client_key=consumer_key, client_secret=consumer_secret,
signature_method='RSA-SHA1', rsa_key=rsa_key)
def get_oauth1session(consumer_key, consumer_secret, private_key, passphrase):
from Crypto.PublicKey import RSA
from requests_oauthlib import OAuth1
with open(private_key, 'r') as fd:
rsa_key = RSA.importKey(fd.read(), passphrase)
return OAuth1(client_key=consumer_key, client_secret=consumer_secret,
signature_method='RSA-SHA1', rsa_key=rsa_key)