python类OAuth1()的实例源码

MuseScoreAPI.py 文件源码 项目:epfl-semester-project-biaxialnn 作者: onanypoint 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(
            self,
            credFile=None,
            client_key=None,
            proxy_url=None):
        """Initialize with your MuseScore application credentials"""
        self.proxies = {'https': proxy_url} if proxy_url else None
        auth_type='oAuth1'
        if credFile and os.path.isfile(credFile):
            with open("credentials.json") as json_file:
                cred = json.load(json_file)
            self.auth = OAuth1(cred["client_key"],
                          client_secret=cred["client_secret"],
                          resource_owner_key=cred["resource_owner_key"],
                          resource_owner_secret=cred["resource_owner_secret"])
        elif client_key:
            self.auth = None
            self.client_key = client_key
        else:
            raise Exception('At least a client key is needed')
auth.py 文件源码 项目:flickr_downloader 作者: Denisolt 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, api_key, api_secret, oauth_token=None, default_timeout=None):
        self.log = logging.getLogger('%s.%s' % (self.__class__.__module__, self.__class__.__name__))

        assert isinstance(api_key, six.text_type), 'api_key must be unicode string'
        assert isinstance(api_secret, six.text_type), 'api_secret must be unicode string'

        token = None
        secret = None
        if oauth_token.token:
            token = oauth_token.token.token
            secret = oauth_token.token.token_secret

        self.oauth = OAuth1(api_key, api_secret, token, secret, signature_type='auth_header')
        self.oauth_token = oauth_token
        self.auth_http_server = None
        self.requested_permissions = None
        self.default_timeout = default_timeout
verify_oauth.py 文件源码 项目:nsc-cloudproject-s22016 作者: agitaretech 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def verify_oauth(access_token,access_token_secret):
    # consumer key and secret.  these keys are generated when creating
    # our twitter app on the twitter developer portal.  They are needed
    # when verifing twitter credentials 
    consumer_key='kbwf3Qbb6MfjXiPirco5j3UYe'
    consumer_secret='1Lr0HOTxQBLU22FsH2CH9RFBtnQCHwjAsidZrctvYoy9yIDX7n'

    # url of twitter api that verifys a users credentials
    url = 'https://api.twitter.com/1.1/account/verify_credentials.json'
    # oAuth1 method that creates an authorization object to hand to
    # twitters api
    auth = OAuth1(consumer_key, consumer_secret, access_token, access_token_secret)

    # gets request from twitter api
    # returns json result from request
    return requests.get(url, auth=auth)

# main method
verify_oauth.py 文件源码 项目:nsc-cloudproject-s22016 作者: agitaretech 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def verify_oauth(access_token,access_token_secret):
    # consumer key and secret.  these keys are generated when creating
    # our twitter app on the twitter developer portal.  They are needed
    # when verifing twitter credentials 
    consumer_key='kbwf3Qbb6MfjXiPirco5j3UYe'
    consumer_secret='1Lr0HOTxQBLU22FsH2CH9RFBtnQCHwjAsidZrctvYoy9yIDX7n'

    # url of twitter api that verifys a users credentials
    url = 'https://api.twitter.com/1.1/account/verify_credentials.json'
    # oAuth1 method that creates an authorization object to hand to
    # twitters api
    auth = OAuth1(consumer_key, consumer_secret, access_token, access_token_secret)

    # gets request from twitter api
    # returns json result from request
    return requests.get(url, auth=auth)

# main method
outcome_request.py 文件源码 项目:lti 作者: pylti 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def post_outcome_request(self, **kwargs):
        '''
        POST an OAuth signed request to the Tool Consumer.
        '''
        if not self.has_required_attributes():
            raise InvalidLTIConfigError(
                'OutcomeRequest does not have all required attributes')

        header_oauth = OAuth1(self.consumer_key, self.consumer_secret,
                              signature_type=SIGNATURE_TYPE_AUTH_HEADER,
                              force_include_body=True, **kwargs)

        headers = {'Content-type': 'application/xml'}
        resp = requests.post(self.lis_outcome_service_url, auth=header_oauth,
                             data=self.generate_request_xml(),
                             headers=headers)
        outcome_resp = OutcomeResponse.from_post_response(resp, resp.content)
        self.outcome_response = outcome_resp
        return self.outcome_response
tool_outbound.py 文件源码 项目:lti 作者: pylti 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def generate_launch_request(self, **kwargs):
        """
        returns a Oauth v1 "signed" requests.PreparedRequest instance
        """

        if not self.has_required_params():
            raise InvalidLTIConfigError(
                'Consumer\'s launch params missing one of '
                + str(LAUNCH_PARAMS_REQUIRED)
            )

        params = self.to_params()
        r = Request('POST', self.launch_url, data=params).prepare()
        sign = OAuth1(self.consumer_key, self.consumer_secret,
                                signature_type=SIGNATURE_TYPE_BODY, **kwargs)
        return sign(r)
client.py 文件源码 项目:django-twilio-tfa 作者: rtindru 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _get_request_token(self):
        """
        Obtain a temporary request token to authorize an access token and to
        sign the request to obtain the access token
        """
        if self.request_token is None:
            get_params = {}
            if self.parameters:
                get_params.update(self.parameters)
            get_params['oauth_callback'] \
                = self.request.build_absolute_uri(self.callback_url)
            rt_url = self.request_token_url + '?' + urlencode(get_params)
            oauth = OAuth1(self.consumer_key,
                           client_secret=self.consumer_secret)
            response = requests.post(url=rt_url, auth=oauth)
            if response.status_code not in [200, 201]:
                raise OAuthError(
                    _('Invalid response while obtaining request token'
                      ' from "%s".') % get_token_prefix(
                          self.request_token_url))
            self.request_token = dict(parse_qsl(response.text))
            self.request.session['oauth_%s_request_token' % get_token_prefix(
                self.request_token_url)] = self.request_token
        return self.request_token
client.py 文件源码 项目:django-twilio-tfa 作者: rtindru 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def query(self, url, method="GET", params=dict(), headers=dict()):
        """
        Request a API endpoint at ``url`` with ``params`` being either the
        POST or GET data.
        """
        access_token = self._get_at_from_session()
        oauth = OAuth1(
            self.consumer_key,
            client_secret=self.secret_key,
            resource_owner_key=access_token['oauth_token'],
            resource_owner_secret=access_token['oauth_token_secret'])
        response = getattr(requests, method.lower())(url,
                                                     auth=oauth,
                                                     headers=headers,
                                                     params=params)
        if response.status_code != 200:
            raise OAuthError(
                _('No access to private resources at "%s".')
                % get_token_prefix(self.request_token_url))

        return response.text
linkedin.py 文件源码 项目:python3-linkedin 作者: DEKHTIARJonathan 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def make_request(self, method, url, data=None, params=None, headers=None,
                     timeout=60):
        if headers is None:
            headers = {'x-li-format': 'json', 'Content-Type': 'application/json'}
        else:
            headers.update({'x-li-format': 'json', 'Content-Type': 'application/json'})

        if params is None:
            params = {}
        kw = dict(data=data, params=params,
                  headers=headers, timeout=timeout)

        if isinstance(self.authentication, LinkedInDeveloperAuthentication):
            # Let requests_oauthlib.OAuth1 do *all* of the work here
            auth = OAuth1(self.authentication.consumer_key, self.authentication.consumer_secret,
                          self.authentication.user_token, self.authentication.user_secret)
            kw.update({'auth': auth})
        else:
            params.update({'oauth2_access_token': self.authentication.token.access_token})

        return requests.request(method.upper(), url, **kw)
osm.py 文件源码 项目:kort-core 作者: kort 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_user_info(data) -> object:
    oauth_token = data.get('oauth_token')
    oauth_token_secret = data.get('oauth_token_secret')

    userinfo = {}
    userinfo['oauth_token'] = oauth_token
    userinfo['oauth_token_secret'] = oauth_token_secret

    url = "http://api.openstreetmap.org/api/0.6/user/details"
    auth = OAuth1(BaseConfig.OSM_CONSUMER_KEY, BaseConfig.OSM_CONSUMER_SECRET, oauth_token, oauth_token_secret)
    resp = requests.get(url, auth=auth)

    tree = ElementTree.fromstring(resp.content)
    for child in tree:
        if child.tag == 'user':
            userinfo['display_name'] = child.attrib.get('display_name')
            userinfo['id'] = child.attrib.get('id')
            for innerChild in child:
                if innerChild.tag == 'img':
                    userinfo['img'] = innerChild.attrib.get('href')
    return userinfo
xing.py 文件源码 项目:social-core 作者: python-social-auth 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def clean_oauth_auth(self, access_token):
        """Override of oauth_auth since Xing doesn't like callback_uri
        and oauth_verifier on authenticated API calls"""
        key, secret = self.get_key_and_secret()
        resource_owner_key = access_token.get('oauth_token')
        resource_owner_secret = access_token.get('oauth_token_secret')
        if not resource_owner_key:
            raise AuthTokenError(self, 'Missing oauth_token')
        if not resource_owner_secret:
            raise AuthTokenError(self, 'Missing oauth_token_secret')
        # decoding='utf-8' produces errors with python-requests on Python3
        # since the final URL will be of type bytes
        decoding = None if six.PY3 else 'utf-8'
        return OAuth1(key, secret,
                      resource_owner_key=resource_owner_key,
                      resource_owner_secret=resource_owner_secret,
                      signature_type=SIGNATURE_TYPE_AUTH_HEADER,
                      decoding=decoding)
oauth.py 文件源码 项目:social-core 作者: python-social-auth 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def unauthorized_token(self):
        """Return request for unauthorized token (first stage)"""
        params = self.request_token_extra_arguments()
        params.update(self.get_scope_argument())
        key, secret = self.get_key_and_secret()
        # decoding='utf-8' produces errors with python-requests on Python3
        # since the final URL will be of type bytes
        decoding = None if six.PY3 else 'utf-8'
        state = self.get_or_create_state()
        response = self.request(
            self.REQUEST_TOKEN_URL,
            params=params,
            auth=OAuth1(key, secret, callback_uri=self.get_redirect_uri(state),
                        decoding=decoding),
            method=self.REQUEST_TOKEN_METHOD
        )
        content = response.content
        if response.encoding or response.apparent_encoding:
            content = content.decode(response.encoding or
                                     response.apparent_encoding)
        else:
            content = response.content.decode()
        return content
oauth.py 文件源码 项目:social-core 作者: python-social-auth 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def oauth_auth(self, token=None, oauth_verifier=None,
                   signature_type=SIGNATURE_TYPE_AUTH_HEADER):
        key, secret = self.get_key_and_secret()
        oauth_verifier = oauth_verifier or self.data.get('oauth_verifier')
        if token:
            resource_owner_key = token.get('oauth_token')
            resource_owner_secret = token.get('oauth_token_secret')
            if not resource_owner_key:
                raise AuthTokenError(self, 'Missing oauth_token')
            if not resource_owner_secret:
                raise AuthTokenError(self, 'Missing oauth_token_secret')
        else:
            resource_owner_key = None
            resource_owner_secret = None
        # decoding='utf-8' produces errors with python-requests on Python3
        # since the final URL will be of type bytes
        decoding = None if six.PY3 else 'utf-8'
        state = self.get_or_create_state()
        return OAuth1(key, secret,
                      resource_owner_key=resource_owner_key,
                      resource_owner_secret=resource_owner_secret,
                      callback_uri=self.get_redirect_uri(state),
                      verifier=oauth_verifier,
                      signature_type=signature_type,
                      decoding=decoding)
khanacademy.py 文件源码 项目:social-core 作者: python-social-auth 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def unauthorized_token_request(self):
        """Return request for unauthorized token (first stage)"""

        params = self.request_token_extra_arguments()
        params.update(self.get_scope_argument())
        key, secret = self.get_key_and_secret()
        # decoding='utf-8' produces errors with python-requests on Python3
        # since the final URL will be of type bytes
        decoding = None if six.PY3 else 'utf-8'
        state = self.get_or_create_state()
        auth = OAuth1(
            key,
            secret,
            callback_uri=self.get_redirect_uri(state),
            decoding=decoding,
            signature_method=SIGNATURE_HMAC,
            signature_type=SIGNATURE_TYPE_QUERY
        )
        url = self.REQUEST_TOKEN_URL + '?' + urlencode(params)
        url, _, _ = auth.client.sign(url)
        return url
nk.py 文件源码 项目:social-core 作者: python-social-auth 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def oauth_auth(self, token=None, oauth_verifier=None,
                   signature_type=SIGNATURE_TYPE_AUTH_HEADER):
        key, secret = self.get_key_and_secret()
        oauth_verifier = oauth_verifier or self.data.get('oauth_verifier')
        token = token or {}
        # decoding='utf-8' produces errors with python-requests on Python3
        # since the final URL will be of type bytes
        decoding = None if six.PY3 else 'utf-8'
        state = self.get_or_create_state()
        return OAuth1(key, secret,
                      resource_owner_key=None,
                      resource_owner_secret=None,
                      callback_uri=self.get_redirect_uri(state),
                      verifier=oauth_verifier,
                      signature_type=signature_type,
                      decoding=decoding)
client.py 文件源码 项目:Provo-Housing-Database 作者: marcopete5 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _get_request_token(self):
        """
        Obtain a temporary request token to authorize an access token and to
        sign the request to obtain the access token
        """
        if self.request_token is None:
            get_params = {}
            if self.parameters:
                get_params.update(self.parameters)
            get_params['oauth_callback'] \
                = self.request.build_absolute_uri(self.callback_url)
            rt_url = self.request_token_url + '?' + urlencode(get_params)
            oauth = OAuth1(self.consumer_key,
                           client_secret=self.consumer_secret)
            response = requests.post(url=rt_url, auth=oauth)
            if response.status_code not in [200, 201]:
                raise OAuthError(
                    _('Invalid response while obtaining request token from "%s".') % get_token_prefix(self.request_token_url))
            self.request_token = dict(parse_qsl(response.text))
            self.request.session['oauth_%s_request_token' % get_token_prefix(self.request_token_url)] = self.request_token
        return self.request_token
client.py 文件源码 项目:Provo-Housing-Database 作者: marcopete5 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def query(self, url, method="GET", params=dict(), headers=dict()):
        """
        Request a API endpoint at ``url`` with ``params`` being either the
        POST or GET data.
        """
        access_token = self._get_at_from_session()
        oauth = OAuth1(
            self.consumer_key,
            client_secret=self.secret_key,
            resource_owner_key=access_token['oauth_token'],
            resource_owner_secret=access_token['oauth_token_secret'])
        response = getattr(requests, method.lower())(url,
                                                     auth=oauth,
                                                     headers=headers,
                                                     params=params)
        if response.status_code != 200:
            raise OAuthError(
                _('No access to private resources at "%s".')
                % get_token_prefix(self.request_token_url))

        return response.text
tweeter.py 文件源码 项目:daysuntilreinvent.com 作者: ranman 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def lambda_handler(event, context):
    # If being invoked by a dynamodb trigger
    if 'Records' in event:
        tweeters = dynamo_triggered_new_users(event)
        status = new_user_status
    else:  # If being invoked by the cron we scan the table
        tweeters = token_table.scan()['Items']

    for tweeter in tweeters:
        auth = OAuth1(
            creds['CONSUMER_KEY'],
            creds['CONSUMER_SECRET'],
            tweeter['oauth_token'],
            tweeter['oauth_token_secret']
        )
        resp = requests.post(
            "https://api.twitter.com/1.1/statuses/update.json",
            data={'status': status},
            auth=auth
        )
        if status == 200:
            print("Tweeted from " + tweeter['screen_name'])
        else:
            print(resp.text)
client.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def bind_auth(self, **kwargs):
        kwargs['auth'] = OAuth1(
            six.text_type(settings.BITBUCKET_CONSUMER_KEY),
            six.text_type(settings.BITBUCKET_CONSUMER_SECRET),
            self.auth.tokens['oauth_token'],
            self.auth.tokens['oauth_token_secret'],
            signature_type='auth_header'
        )
        return kwargs
wyahooapi.py 文件源码 项目:my-weather-indicator 作者: atareao 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self,
                 longitude=-0.418,
                 latitude=39.360,
                 units=weatherservice.Units()):
        WeatherService.__init__(self, longitude, latitude, units)
        self.oauth = OAuth1(API_KEY, SHARED_SECRET)
        self.woeid = geocodeapi.get_woeid(latitude, longitude)
yelpapi.py 文件源码 项目:30-Days-of-Python 作者: codingforentrepreneurs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def do_search(term='Food', location='San Francisco'):
    base_url = 'https://api.yelp.com/v2/search'
    term = term.replace(' ', '+')
    location = location.replace(' ', '+')
    url = "{base_url}?term={term}&location={location}".format(base_url=base_url, 
                        term=term, 
                        location=location)
    auth = OAuth1(consumer_key, 
            consumer_secret, 
            token, 
            token_secret)
    r = requests.get(url, auth=auth)
    return r.json(), r.text
twitter_api.py 文件源码 项目:pytennessee 作者: lorenanicole 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, consumer_key=None, consumer_secret=None, oauth_token=None, oauth_secret=None):
        self.consumer_key = consumer_key or TWITTER_CONSUMER_KEY
        self.consumer_secret = consumer_secret or TWITTER_CONSUMER_SECRET
        self.oauth_token = oauth_token or OAUTH_TOKEN
        self.oauth_token_secret = oauth_secret or OAUTH_TOKEN_SECRET
        self.auth=OAuth1(self.consumer_key,
                         client_secret=self.consumer_secret,
                         resource_owner_key=self.oauth_token,
                         resource_owner_secret=self.oauth_token_secret)
httpRequests.py 文件源码 项目:PythonV3InvoiceSampleApp 作者: IntuitDeveloper 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def requestQBO(method, url, context, payload=None):

    if settings.oauth_flag == 2:
        headers = {'Accept': 'application/json', 'User-Agent': 'PythonSampleApp2.0', 'Authorization': 'Bearer '+settings.access_token_oauth2}
        req = requests.request(method,url, headers=headers, json=payload)
    else:
        headers = {'Accept': 'application/json', 'content-type': 'application/json; charset=utf-8', 'User-Agent': 'PythonSampleApp2.0'}
        auth=OAuth1(context.consumerToken, context.consumerSecret, 
        context.accessToken, context.accessSecret)
        req = requests.request(method,url, auth=auth, headers=headers, json=payload)
    return req
__init__.py 文件源码 项目:fallball-connector 作者: ingrammicro 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_oauth():
    client_key = get_client_key(request)
    client_secret = Config().oauth_secret
    if not client_key or not client_secret:
        return None
    return OAuth1(client_key=client_key,
                  client_secret=client_secret)
handler_500px.py 文件源码 项目:open-ledger 作者: creativecommons 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def auth():
    return OAuth1(settings.API_500PX_KEY, client_secret=settings.API_500PX_SECRET)
tool_proxy.py 文件源码 项目:lti 作者: pylti 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def register_proxy(self, tool_profile):
        register_url = self.find_registration_url()

        r = Request("POST", register_url, data=json.dumps(tool_profile, indent=4), headers={'Content-Type':'application/vnd.ims.lti.v2.toolproxy+json'}).prepare()
        sign = OAuth1(self.launch_params['reg_key'], self.launch_params['reg_password'],
                      signature_type=SIGNATURE_TYPE_AUTH_HEADER, force_include_body=True)
        signed = sign(r)

        return signed
app.py 文件源码 项目:oabot 作者: dissemin 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def edit_wiki_page(page_name, content, summary=None):
    access_token = flask.session.get('access_token', None)
    auth = OAuth1(
        app.config['CONSUMER_KEY'],
        app.config['CONSUMER_SECRET'],
        access_token['key'],
        access_token['secret'])

    # Get token
    r = requests.get('https://en.wikipedia.org/w/api.php', params={
    'action':'query',
    'meta':'tokens',
        'format': 'json',
    }, auth=auth)
    r.raise_for_status()
    token = r.json()['query']['tokens']['csrftoken']

    r = requests.post('https://en.wikipedia.org/w/api.php', data={
    'action':'edit',
        'title': page_name,
    'text': content,
        'summary': summary,
        'format': 'json',
        'token': token,
        'watchlist': 'nochange',
    }, auth=auth)
    r.raise_for_status()
check_authentication.py 文件源码 项目:igmonplugins 作者: innogames 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create_oauth1(consumer_key, consumer_secret, private_key, passphrase):
    from Crypto.PublicKey import RSA
    from requests_oauthlib import OAuth1
    with open(private_key, 'rb') as fd:
        rsa_key = RSA.importKey(fd.read(), passphrase)

    return OAuth1(client_key=consumer_key, client_secret=consumer_secret,
                  signature_method='RSA-SHA1', rsa_key=rsa_key)
atlassian_expiring_licenses.py 文件源码 项目:igmonplugins 作者: innogames 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def get_oauth1session(consumer_key, consumer_secret, private_key, passphrase):
    from Crypto.PublicKey import RSA
    from requests_oauthlib import OAuth1
    with open(private_key, 'r') as fd:
        rsa_key = RSA.importKey(fd.read(), passphrase)

    return OAuth1(client_key=consumer_key, client_secret=consumer_secret,
                  signature_method='RSA-SHA1', rsa_key=rsa_key)
atlassian_plugin_updates.py 文件源码 项目:igmonplugins 作者: innogames 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_oauth1session(consumer_key, consumer_secret, private_key, passphrase):
    from Crypto.PublicKey import RSA
    from requests_oauthlib import OAuth1
    with open(private_key, 'r') as fd:
        rsa_key = RSA.importKey(fd.read(), passphrase)

    return OAuth1(client_key=consumer_key, client_secret=consumer_secret,
                  signature_method='RSA-SHA1', rsa_key=rsa_key)


问题


面经


文章

微信
公众号

扫码关注公众号