python类Client()的实例源码

__init__.py 文件源码 项目:yahoo-fantasy-football-metrics 作者: uberfastman 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_token_and_auth_url(self, callback_url=None):
        """First step is to get the token and then send the request that
        provides the auth URL

        Returns a tuple of token and the authorisation URL.

        """

        client = oauth.Client(self.consumer)

        params = {}
        params['oauth_callback'] = callback_url or 'oob'

        request = oauth.Request(parameters=params)
        url = REQUEST_TOKEN_URL
        resp, content = client.request(url, "POST", request.to_postdata())

        if resp.get('status') == '200':
            token = oauth.Token.from_string(content)
            yql_logger.debug("token: %s", token)
            data = dict(parse_qsl(content))
            yql_logger.debug("data: %s", data)
            return token, data['xoauth_request_auth_url']
        else:
            raise YQLError(resp, content, url)
__init__.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_token_and_auth_url(self, callback_url=None):
        """First step is to get the token and then send the request that
        provides the auth URL

        Returns a tuple of token and the authorisation URL.

        """

        client = oauth.Client(self.consumer)

        params = {}
        params['oauth_callback'] = callback_url or 'oob'

        request = oauth.Request(parameters=params)
        url = REQUEST_TOKEN_URL
        resp, content = client.request(url, "POST", request.to_postdata())

        if resp.get('status') == '200':
            token = oauth.Token.from_string(content)
            yql_logger.debug("token: %s", token)
            data = dict(parse_qsl(content))
            yql_logger.debug("data: %s", data)
            return token, data['xoauth_request_auth_url']
        else:
            raise YQLError, (resp, content, url)
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_set_signature_method(self):
        consumer = oauth.Consumer('key', 'secret')
        client = oauth.Client(consumer)

        class Blah:
            pass

        try:
            client.set_signature_method(Blah())
            self.fail("Client.set_signature_method() accepted invalid method.")
        except ValueError:
            pass

        m = oauth.SignatureMethod_HMAC_SHA1()
        client.set_signature_method(m)
        self.assertEqual(m, client.method)
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_init(self):
        class Blah():
            pass

        try:
            client = oauth.Client(Blah())
            self.fail("Client.__init__() accepted invalid Consumer.")
        except ValueError:
            pass

        consumer = oauth.Consumer('token', 'secret')
        try:
            client = oauth.Client(consumer, Blah())
            self.fail("Client.__init__() accepted invalid Token.")
        except ValueError:
            pass
trading.py 文件源码 项目:trump2cash 作者: maxbbraun 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def make_request(self, url, method="GET", body="", headers=None):
        """Makes a request to the TradeKing API."""

        consumer = Consumer(key=TRADEKING_CONSUMER_KEY,
                            secret=TRADEKING_CONSUMER_SECRET)
        token = Token(key=TRADEKING_ACCESS_TOKEN,
                      secret=TRADEKING_ACCESS_TOKEN_SECRET)
        client = Client(consumer, token)

        self.logs.debug("TradeKing request: %s %s %s %s" %
                        (url, method, body, headers))
        response, content = client.request(url, method=method, body=body,
                                           headers=headers)
        self.logs.debug("TradeKing response: %s %s" % (response, content))

        try:
            return loads(content)
        except ValueError:
            self.logs.error("Failed to decode JSON response: %s" % content)
            return None
twitter.py 文件源码 项目:socialauth 作者: emilyhorsman 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_user_information(self):
        token = oauth2.Token(self.oauth_token, self.oauth_token_secret)
        token.set_verifier(self.oauth_verifier)
        client = oauth2.Client(self.consumer, token)

        url = 'https://api.twitter.com/oauth/access_token'
        resp, content = client.request(url, 'POST')
        if resp.status != 200:
            raise Error('{} from Twitter'.format(resp.status))

        provider_user = dict(parse_qsl(content.decode('utf-8')))
        if 'user_id' not in provider_user:
            raise Error('No user_id from Twitter')

        self.status    = 200
        self.user_id   = provider_user.get('user_id')
        self.user_name = provider_user.get('screen_name', None)
        return (self.user_id, self.user_name,)
views.py 文件源码 项目:ewe_ebooks 作者: jaymcgrath 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_request_token(self):
        """
        Function for generating initial request token for three-legged oauth
        Takes no input, returns a request_token dict with two keys, oauth_token and oauth_token_secret
        """

        callback_url = self.request.build_absolute_uri(reverse('bot-authorize', args=(self.object.pk, )))

        consumer = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET)
        client = oauth.Client(consumer)
        resp, content = client.request(self.REQUEST_TOKEN_URL, "POST",
                                       body=urllib.parse.urlencode({'oauth_callback': callback_url}))

        if resp['status'] != '200':
            raise Exception("Invalid response %s." % resp['status'])

        request_token = dict(urllib.parse.parse_qsl(content))

        # urllib returns bytes, which will need to be decoded using the string.decode() method before use
        request_token = {key.decode(): value.decode() for key, value in request_token.items()}

        # Return the token dict containing token and secret
        return request_token
views.py 文件源码 项目:ewe_ebooks 作者: jaymcgrath 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_access_token(self, oauth_token, oauth_token_secret, oauth_verifier):
        """
        Func for final leg of three-legged oauth,
        takes token and secret returned in oauth_callback GET dict and
         exchanges them for the permanent access token and secret
        returns an access_token dict with two keys, oauth_token and oauth_token_secret
        """
        token = oauth.Token(oauth_token, oauth_token_secret)
        token.set_verifier(oauth_verifier)
        consumer = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET)
        client = oauth.Client(consumer, token)

        # Now we fire the request at access token url instead of request token
        resp, content = client.request(self.ACCESS_TOKEN_URL, "POST")

        if resp['status'] != '200':
            raise Exception("Invalid response %s." % resp['status'])

        access_token = dict(urllib.parse.parse_qsl(content))

        # urllib returns bytes, which will need to be decoded using the string.decode() method before use
        access_token = {key.decode(): value.decode() for key, value in access_token.items()}

        # Return the token dict containing token and secret
        return access_token
test_oauth.py 文件源码 项目:OneClickDTU 作者: satwikkansal 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_set_signature_method(self):
        consumer = oauth.Consumer('key', 'secret')
        client = oauth.Client(consumer)

        class Blah:
            pass

        try:
            client.set_signature_method(Blah())
            self.fail("Client.set_signature_method() accepted invalid method.")
        except ValueError:
            pass

        m = oauth.SignatureMethod_HMAC_SHA1()
        client.set_signature_method(m)
        self.assertEqual(m, client.method)
test_oauth.py 文件源码 项目:OneClickDTU 作者: satwikkansal 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_init(self):
        class Blah():
            pass

        try:
            client = oauth.Client(Blah())
            self.fail("Client.__init__() accepted invalid Consumer.")
        except ValueError:
            pass

        consumer = oauth.Consumer('token', 'secret')
        try:
            client = oauth.Client(consumer, Blah())
            self.fail("Client.__init__() accepted invalid Token.")
        except ValueError:
            pass
request.py 文件源码 项目:TumblrVideos 作者: moedje 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get(self, url, params):
        """
        Issues a GET request against the API, properly formatting the params

        :param url: a string, the url you are requesting
        :param params: a dict, the key-value of all the paramaters needed
                       in the request
        :returns: a dict parsed of the JSON response
        """
        url = self.host + url
        if params:
            url = url + "?" + urllib.urlencode(params)

        client = oauth.Client(self.consumer, self.token, proxy_info=self.proxy_info)
        client.disable_ssl_certificate_validation = True
        try:
            client.follow_redirects = False
            resp, content = client.request(url, method="GET", redirections=False)
        except RedirectLimit, e:
            resp, content = e.args

        return self.json_parse(content)
request.py 文件源码 项目:TumblrVideos 作者: moedje 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def post(self, url, params={}, files=[]):
        """
        Issues a POST request against the API, allows for multipart data uploads

        :param url: a string, the url you are requesting
        :param params: a dict, the key-value of all the parameters needed
                       in the request
        :param files: a list, the list of tuples of files

        :returns: a dict parsed of the JSON response
        """
        url = self.host + url
        try:
            if files:
                return self.post_multipart(url, params, files)
            else:
                client = oauth.Client(self.consumer, self.token, proxy_info=self.proxy_info)
                client.disable_ssl_certificate_validation = True
                resp, content = client.request(url, method="POST", body=urllib.urlencode(params))
                return self.json_parse(content)
        except urllib2.HTTPError, e:
            return self.json_parse(e.read())
auth_setup.py 文件源码 项目:dancedeets-monorepo 作者: mikelambert 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def twitter_oauth2(oauth_token, oauth_verifier):
    auth_tokens = db.OAuthToken.query(db.OAuthToken.temp_oauth_token == oauth_token, db.OAuthToken.application == db.APP_TWITTER).fetch(1)
    if not auth_tokens:
        return None
    auth_token = auth_tokens[0]
    # Step 3: Once the consumer has redirected the user back to the oauth_callback
    # URL you can request the access token the user has approved. You use the
    # request token to sign this request. After this is done you throw away the
    # request token and use the access token returned. You should store this
    # access token somewhere safe, like a database, for future use.
    token = oauth.Token(oauth_token, auth_token.temp_oauth_token_secret)
    token.set_verifier(oauth_verifier)
    consumer = oauth.Consumer(auth.consumer_key, auth.consumer_secret)
    client = oauth.Client(consumer, token)

    resp, content = client.request(access_token_url, "POST")
    access_token = dict(urlparse.parse_qsl(content))
    auth_token.oauth_token = access_token['oauth_token']
    auth_token.oauth_token_secret = access_token['oauth_token_secret']
    auth_token.valid_token = True
    auth_token.time_between_posts = 5 * 60  # every 5 minutes
    auth_token.put()
    return auth_token
__init__.py 文件源码 项目:splitwise 作者: namaggarwal 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def getAuthorizeURL(self):

        client = oauth.Client(self.consumer)

        #Get the request token
        resp, content = client.request(Splitwise.REQUEST_TOKEN_URL, "POST")

        if Splitwise.isDebug():
            print(resp, content)

        #Check if the response is correct
        if resp['status'] != '200':
            raise Exception("Invalid response %s. Please check your consumer key and secret." % resp['status'])

        request_token = dict(parse_qsl(content.decode("utf-8")))

        return "%s?oauth_token=%s" % (Splitwise.AUTHORIZE_URL, request_token['oauth_token']), request_token['oauth_token_secret']
__init__.py 文件源码 项目:splitwise 作者: namaggarwal 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def getAccessToken(self,oauth_token,oauth_token_secret,oauth_verifier):

        token = oauth.Token(oauth_token,oauth_token_secret)
        token.set_verifier(oauth_verifier)
        client = oauth.Client(self.consumer, token)

        resp, content = client.request(Splitwise.ACCESS_TOKEN_URL, "POST")

        if Splitwise.isDebug():
            print(resp, content)

        #Check if the response is correct
        if resp['status'] != '200':
            raise Exception("Invalid response %s. Please check your consumer key and secret." % resp['status'])

        access_token = dict(parse_qsl(content.decode("utf-8")))

        return access_token
twitter_api.py 文件源码 项目:Build-a-Bot 作者: ShanTulshi 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_tweets(twitter_handle):
    CONSUMER_KEY = 'YurZ2aWgVEYVhE5c60XyWRGG0'
    CONSUMER_SECRET = 'TIaSl3xnVTNUzHrwz84lOycTYkwe9l1NocwkB4hGbd2ngMufn6'
    ACCESS_KEY = '564268368-tRdDLN3O9EKzlaY28NzHCgNsYJX59YRngv3qjjJh'
    ACCESS_SECRET = 'VUjRU2ftlmnUdqDtppMiV6LLqT83ZbKDDUjcpWtrT1PG4'

    consumer = oauth.Consumer(key=CONSUMER_KEY, secret=CONSUMER_SECRET)
    access_token = oauth.Token(key=ACCESS_KEY, secret=ACCESS_SECRET)
    client = oauth.Client(consumer, access_token)

    endpoint = "https://api.twitter.com/1.1/search/tweets.json?q=from%3A" + twitter_handle + "&result_type=all&count=100"
    response, data = client.request(endpoint)

    tweet_data = json.loads(data.decode('utf-8'))
    tweets = []
    for tweet in tweet_data['statuses']:
        tweets.append(re.sub(r'https:\/\/t\.co\/.{10}', '', tweet['text'] + ' \n'))
    return tweets

#example
# print(get_tweets("Cmdr_Hadfield"))
oauth10a_account.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def accessToken(self):
        """Return the access token generated by the authenticating server.

        If token is already in the session that one will be used.
        Otherwise the token is fetched from the auth server.

        """

        if self.session.access_token:
            # return the token (TODO: does it expire?)

            return self.session.access_token
        if self.session.request_token:
            # Exchange the request token with an authorization token.
            token = self.session.request_token
            self.session.request_token = None

            # Build an authorized client
            # OAuth1.0a put the verifier!
            token.set_verifier(self.request.vars.oauth_verifier)
            client = oauth.Client(self.consumer, token)

            resp, content = client.request(self.access_token_url, "POST")
            if str(resp['status']) != '200':
                self.session.request_token = None
                self.globals['redirect'](self.globals[
                                         'URL'](f='user', args='logout'))

            self.session.access_token = oauth.Token.from_string(content)

            return self.session.access_token

        self.session.access_token = None
        return None
oauth10a_account.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __oauth_login(self, next):
        '''This method redirects the user to the authenticating form
        on authentication server if the authentication code
        and the authentication token are not available to the
        application yet.

        Once the authentication code has been received this method is
        called to set the access token into the session by calling
        accessToken()
        '''

        if not self.accessToken():
            # setup the client
            client = oauth.Client(self.consumer, None, timeout=self.socket_timeout)
            # Get a request token.
            # oauth_callback *is REQUIRED* for OAuth1.0a
            # putting it in the body seems to work.
            callback_url = self.__redirect_uri(next)
            data = urlencode(dict(oauth_callback=callback_url))
            resp, content = client.request(self.token_url, "POST", body=data)
            if resp['status'] != '200':
                self.session.request_token = None
                self.globals['redirect'](self.globals[
                                         'URL'](f='user', args='logout'))

            # Store the request token in session.
            request_token = self.session.request_token = oauth.Token.from_string(content)

            # Redirect the user to the authentication URL and pass the callback url.
            data = urlencode(dict(oauth_token=request_token.key,
                                  oauth_callback=callback_url))
            auth_request_url = self.auth_url + '?' + data

            HTTP = self.globals['HTTP']

            raise HTTP(302,
                       "You are not authenticated: you are being redirected to the <a href='" + auth_request_url + "'> authentication server</a>",
                       Location=auth_request_url)

        return None
send_grades.py 文件源码 项目:sga-lti 作者: mitodl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _post_patched_request(lti_key, secret, body, url, method, content_type):  # pylint: disable=too-many-arguments
    """
    Authorization header needs to be capitalized for some LTI clients
    this function ensures that header is capitalized

    :param body: body of the call
    :param client: OAuth Client
    :param url: outcome url
    :return: response
    """

    consumer = oauth2.Consumer(key=lti_key, secret=secret)
    client = oauth2.Client(consumer)

    import httplib2

    http = httplib2.Http
    # pylint: disable=protected-access
    normalize = http._normalize_headers

    def my_normalize(self, headers):
        """ This function patches Authorization header """
        ret = normalize(self, headers)
        if 'authorization' in ret:
            ret['Authorization'] = ret.pop('authorization')
        return ret

    http._normalize_headers = my_normalize
    monkey_patch_function = normalize
    response, content = client.request(
        url,
        method,
        body=body.encode("utf8"),
        headers={'Content-Type': content_type})

    http = httplib2.Http
    # pylint: disable=protected-access
    http._normalize_headers = monkey_patch_function

    return response, content
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_init_passes_kwargs_to_httplib2(self):
        class Blah():
            pass

        consumer = oauth.Consumer('token', 'secret')

        # httplib2 options
        client = oauth.Client(consumer, None, cache='.cache', timeout=3, disable_ssl_certificate_validation=True)
        self.assertNotEqual(client.cache, None)
        self.assertEqual(client.timeout, 3)
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_access_token_get(self):
        """Test getting an access token via GET."""
        client = oauth.Client(self.consumer, None)
        resp, content = client.request(self._uri('request_token'), "GET")

        self.assertEqual(int(resp['status']), 200)
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_access_token_post(self):
        """Test getting an access token via POST."""
        client = oauth.Client(self.consumer, None)
        resp, content = client.request(self._uri('request_token'), "POST")

        self.assertEqual(int(resp['status']), 200)

        res = dict(parse_qsl(content))
        self.assertTrue(b'oauth_token' in res)
        self.assertTrue(b'oauth_token_secret' in res)
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_multipart_post_does_not_alter_body(self, mockHttpRequest):
        random_result = random.randint(1,100)

        data = {
            'rand-%d'%random.randint(1,100):random.randint(1,100),
        }
        content_type, body = self.create_simple_multipart_data(data)

        client = oauth.Client(self.consumer, None)
        uri = self._uri('two_legged')

        def mockrequest(cl, ur, **kw):
            self.assertTrue(cl is client)
            self.assertTrue(ur is uri)
            self.assertEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers']))
            self.assertEqual(kw['body'], body)
            self.assertEqual(kw['connection_type'], None)
            self.assertEqual(kw['method'], 'POST')
            self.assertEqual(kw['redirections'],
                                 httplib2.DEFAULT_MAX_REDIRECTS)
            self.assertTrue(isinstance(kw['headers'], dict))

            return random_result

        mockHttpRequest.side_effect = mockrequest

        result = client.request(uri, 'POST',
                                headers={'Content-Type':content_type},
                                body=body)
        self.assertEqual(result, random_result)
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_url_with_query_string(self, mockHttpRequest):
        uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf'
        client = oauth.Client(self.consumer, None)
        random_result = random.randint(1,100)

        def mockrequest(cl, ur, **kw):
            self.assertTrue(cl is client)
            self.assertEqual(frozenset(kw.keys()),
                                 frozenset(['method', 'body', 'redirections',
                                            'connection_type', 'headers']))
            self.assertEqual(kw['body'], b'')
            self.assertEqual(kw['connection_type'], None)
            self.assertEqual(kw['method'], 'GET')
            self.assertEqual(kw['redirections'],
                                 httplib2.DEFAULT_MAX_REDIRECTS)
            self.assertTrue(isinstance(kw['headers'], dict))

            req = oauth.Request.from_consumer_and_token(self.consumer, None,
                    http_method='GET', http_url=uri, parameters={})
            req.sign_request(oauth.SignatureMethod_HMAC_SHA1(),
                             self.consumer, None)
            expected = parse_qsl(
                            urlparse(req.to_url()).query)
            actual = parse_qsl(urlparse(ur).query)
            self.assertEqual(len(expected), len(actual))
            actual = dict(actual)
            for key, value in expected:
                if key not in ('oauth_signature',
                               'oauth_nonce', 'oauth_timestamp'):
                    self.assertEqual(actual[key], value)

            return random_result

        mockHttpRequest.side_effect = mockrequest

        client.request(uri, 'GET')
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_multiple_values_for_a_key(self, mockReqConstructor, mockHttpRequest):
        client = oauth.Client(self.consumer, None)

        request = oauth.Request("GET", "http://example.com/fetch.php", parameters={'multi': ['1', '2']})
        mockReqConstructor.return_value = request

        client.request('http://whatever', 'POST', body='multi=1&multi=2')

        self.assertEqual(mockReqConstructor.call_count, 1)
        self.assertEqual(mockReqConstructor.call_args[1]['parameters'], {'multi': ['1', '2']})

        self.assertTrue('multi=1' in mockHttpRequest.call_args[1]['body'])
        self.assertTrue('multi=2' in mockHttpRequest.call_args[1]['body'])
oauth10a_account.py 文件源码 项目:true_review_web2py 作者: lucadealfaro 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def accessToken(self):
        """Return the access token generated by the authenticating server.

        If token is already in the session that one will be used.
        Otherwise the token is fetched from the auth server.

        """

        if self.session.access_token:
            # return the token (TODO: does it expire?)

            return self.session.access_token
        if self.session.request_token:
            # Exchange the request token with an authorization token.
            token = self.session.request_token
            self.session.request_token = None

            # Build an authorized client
            # OAuth1.0a put the verifier!
            token.set_verifier(self.request.vars.oauth_verifier)
            client = oauth.Client(self.consumer, token)

            resp, content = client.request(self.access_token_url, "POST")
            if str(resp['status']) != '200':
                self.session.request_token = None
                self.globals['redirect'](self.globals[
                                         'URL'](f='user', args='logout'))

            self.session.access_token = oauth.Token.from_string(content)

            return self.session.access_token

        self.session.access_token = None
        return None
oauth10a_account.py 文件源码 项目:true_review_web2py 作者: lucadealfaro 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __oauth_login(self, next):
        '''This method redirects the user to the authenticating form
        on authentication server if the authentication code
        and the authentication token are not available to the
        application yet.

        Once the authentication code has been received this method is
        called to set the access token into the session by calling
        accessToken()
        '''

        if not self.accessToken():
            # setup the client
            client = oauth.Client(self.consumer, None, timeout=self.socket_timeout)
            # Get a request token.
            # oauth_callback *is REQUIRED* for OAuth1.0a
            # putting it in the body seems to work.
            callback_url = self.__redirect_uri(next)
            data = urlencode(dict(oauth_callback=callback_url))
            resp, content = client.request(self.token_url, "POST", body=data)
            if resp['status'] != '200':
                self.session.request_token = None
                self.globals['redirect'](self.globals[
                                         'URL'](f='user', args='logout'))

            # Store the request token in session.
            request_token = self.session.request_token = oauth.Token.from_string(content)

            # Redirect the user to the authentication URL and pass the callback url.
            data = urlencode(dict(oauth_token=request_token.key,
                                  oauth_callback=callback_url))
            auth_request_url = self.auth_url + '?' + data

            HTTP = self.globals['HTTP']

            raise HTTP(302,
                       "You are not authenticated: you are being redirected to the <a href='" + auth_request_url + "'> authentication server</a>",
                       Location=auth_request_url)

        return None
oauth10a_account.py 文件源码 项目:spc 作者: whbrewer 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def accessToken(self):
        """Return the access token generated by the authenticating server.

        If token is already in the session that one will be used.
        Otherwise the token is fetched from the auth server.

        """

        if self.session.access_token:
            # return the token (TODO: does it expire?)

            return self.session.access_token
        if self.session.request_token:
            # Exchange the request token with an authorization token.
            token = self.session.request_token
            self.session.request_token = None

            # Build an authorized client
            # OAuth1.0a put the verifier!
            token.set_verifier(self.request.vars.oauth_verifier)
            client = oauth.Client(self.consumer, token)

            resp, content = client.request(self.access_token_url, "POST")
            if str(resp['status']) != '200':
                self.session.request_token = None
                self.globals['redirect'](self.globals[
                                         'URL'](f='user', args='logout'))

            self.session.access_token = oauth.Token.from_string(content)

            return self.session.access_token

        self.session.access_token = None
        return None
oauth10a_account.py 文件源码 项目:spc 作者: whbrewer 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __oauth_login(self, next):
        '''This method redirects the user to the authenticating form
        on authentication server if the authentication code
        and the authentication token are not available to the
        application yet.

        Once the authentication code has been received this method is
        called to set the access token into the session by calling
        accessToken()
        '''

        if not self.accessToken():
            # setup the client
            client = oauth.Client(self.consumer, None, timeout=self.socket_timeout)
            # Get a request token.
            # oauth_callback *is REQUIRED* for OAuth1.0a
            # putting it in the body seems to work.
            callback_url = self.__redirect_uri(next)
            data = urlencode(dict(oauth_callback=callback_url))
            resp, content = client.request(self.token_url, "POST", body=data)
            if resp['status'] != '200':
                self.session.request_token = None
                self.globals['redirect'](self.globals[
                                         'URL'](f='user', args='logout'))

            # Store the request token in session.
            request_token = self.session.request_token = oauth.Token.from_string(content)

            # Redirect the user to the authentication URL and pass the callback url.
            data = urlencode(dict(oauth_token=request_token.key,
                                  oauth_callback=callback_url))
            auth_request_url = self.auth_url + '?' + data

            HTTP = self.globals['HTTP']

            raise HTTP(302,
                       "You are not authenticated: you are being redirected to the <a href='" + auth_request_url + "'> authentication server</a>",
                       Location=auth_request_url)

        return None
twitter.py 文件源码 项目:socialauth 作者: emilyhorsman 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_initial_oauth_tokens(self):
        url = 'https://api.twitter.com/oauth/request_token'
        client = oauth2.Client(self.consumer)
        resp, content = client.request(url, 'GET')
        if resp.status != 200:
            raise Error('{} from Twitter'.format(resp.status))

        oauth_values = dict(parse_qsl(content.decode('utf-8')))
        self.oauth_token        = oauth_values.get('oauth_token')
        self.oauth_token_secret = oauth_values.get('oauth_token_secret')
        if not self.oauth_token or not self.oauth_token_secret:
            raise Error('No oauth_token or oauth_token_secret from Twitter')

        return (self.oauth_token, self.oauth_token_secret,)


问题


面经


文章

微信
公众号

扫码关注公众号