python类Client()的实例源码

oauth10a_account.py 文件源码 项目:Problematica-public 作者: TechMaz 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def accessToken(self):
        """Return the access token generated by the authenticating server.

        If token is already in the session that one will be used.
        Otherwise the token is fetched from the auth server.

        """

        if self.session.access_token:
            # return the token (TODO: does it expire?)

            return self.session.access_token
        if self.session.request_token:
            # Exchange the request token with an authorization token.
            token = self.session.request_token
            self.session.request_token = None

            # Build an authorized client
            # OAuth1.0a put the verifier!
            token.set_verifier(self.request.vars.oauth_verifier)
            client = oauth.Client(self.consumer, token)

            resp, content = client.request(self.access_token_url, "POST")
            if str(resp['status']) != '200':
                self.session.request_token = None
                self.globals['redirect'](self.globals[
                                         'URL'](f='user', args='logout'))

            self.session.access_token = oauth.Token.from_string(content)

            return self.session.access_token

        self.session.access_token = None
        return None
oauth10a_account.py 文件源码 项目:Problematica-public 作者: TechMaz 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __oauth_login(self, next):
        '''This method redirects the user to the authenticating form
        on authentication server if the authentication code
        and the authentication token are not available to the
        application yet.

        Once the authentication code has been received this method is
        called to set the access token into the session by calling
        accessToken()
        '''

        if not self.accessToken():
            # setup the client
            client = oauth.Client(self.consumer, None, timeout=self.socket_timeout)
            # Get a request token.
            # oauth_callback *is REQUIRED* for OAuth1.0a
            # putting it in the body seems to work.
            callback_url = self.__redirect_uri(next)
            data = urlencode(dict(oauth_callback=callback_url))
            resp, content = client.request(self.token_url, "POST", body=data)
            if resp['status'] != '200':
                self.session.request_token = None
                self.globals['redirect'](self.globals[
                                         'URL'](f='user', args='logout'))

            # Store the request token in session.
            request_token = self.session.request_token = oauth.Token.from_string(content)

            # Redirect the user to the authentication URL and pass the callback url.
            data = urlencode(dict(oauth_token=request_token.key,
                                  oauth_callback=callback_url))
            auth_request_url = self.auth_url + '?' + data

            HTTP = self.globals['HTTP']

            raise HTTP(302,
                       "You are not authenticated: you are being redirected to the <a href='" + auth_request_url + "'> authentication server</a>",
                       Location=auth_request_url)

        return None
test_oauth.py 文件源码 项目:OneClickDTU 作者: satwikkansal 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_init_passes_kwargs_to_httplib2(self):
        class Blah():
            pass

        consumer = oauth.Consumer('token', 'secret')

        # httplib2 options
        client = oauth.Client(consumer, None, cache='.cache', timeout=3, disable_ssl_certificate_validation=True)
        self.assertNotEqual(client.cache, None)
        self.assertEqual(client.timeout, 3)
test_oauth.py 文件源码 项目:OneClickDTU 作者: satwikkansal 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_access_token_get(self):
        """Test getting an access token via GET."""
        client = oauth.Client(self.consumer, None)
        resp, content = client.request(self._uri('request_token'), "GET")

        self.assertEqual(int(resp['status']), 200)
test_oauth.py 文件源码 项目:OneClickDTU 作者: satwikkansal 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_access_token_post(self):
        """Test getting an access token via POST."""
        client = oauth.Client(self.consumer, None)
        resp, content = client.request(self._uri('request_token'), "POST")

        self.assertEqual(int(resp['status']), 200)

        res = dict(parse_qsl(content))
        self.assertTrue(b'oauth_token' in res)
        self.assertTrue(b'oauth_token_secret' in res)
test_oauth.py 文件源码 项目:OneClickDTU 作者: satwikkansal 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_multipart_post_does_not_alter_body(self, mockHttpRequest):
        random_result = random.randint(1,100)

        data = {
            'rand-%d'%random.randint(1,100):random.randint(1,100),
        }
        content_type, body = self.create_simple_multipart_data(data)

        client = oauth.Client(self.consumer, None)
        uri = self._uri('two_legged')

        def mockrequest(cl, ur, **kw):
            self.assertTrue(cl is client)
            self.assertTrue(ur is uri)
            self.assertEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers']))
            self.assertEqual(kw['body'], body)
            self.assertEqual(kw['connection_type'], None)
            self.assertEqual(kw['method'], 'POST')
            self.assertEqual(kw['redirections'],
                                 httplib2.DEFAULT_MAX_REDIRECTS)
            self.assertTrue(isinstance(kw['headers'], dict))

            return random_result

        mockHttpRequest.side_effect = mockrequest

        result = client.request(uri, 'POST',
                                headers={'Content-Type':content_type},
                                body=body)
        self.assertEqual(result, random_result)
test_oauth.py 文件源码 项目:OneClickDTU 作者: satwikkansal 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_url_with_query_string(self, mockHttpRequest):
        uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf'
        client = oauth.Client(self.consumer, None)
        random_result = random.randint(1,100)

        def mockrequest(cl, ur, **kw):
            self.assertTrue(cl is client)
            self.assertEqual(frozenset(kw.keys()),
                                 frozenset(['method', 'body', 'redirections',
                                            'connection_type', 'headers']))
            self.assertEqual(kw['body'], b'')
            self.assertEqual(kw['connection_type'], None)
            self.assertEqual(kw['method'], 'GET')
            self.assertEqual(kw['redirections'],
                                 httplib2.DEFAULT_MAX_REDIRECTS)
            self.assertTrue(isinstance(kw['headers'], dict))

            req = oauth.Request.from_consumer_and_token(self.consumer, None,
                    http_method='GET', http_url=uri, parameters={})
            req.sign_request(oauth.SignatureMethod_HMAC_SHA1(),
                             self.consumer, None)
            expected = parse_qsl(
                            urlparse(req.to_url()).query)
            actual = parse_qsl(urlparse(ur).query)
            self.assertEqual(len(expected), len(actual))
            actual = dict(actual)
            for key, value in expected:
                if key not in ('oauth_signature',
                               'oauth_nonce', 'oauth_timestamp'):
                    self.assertEqual(actual[key], value)

            return random_result

        mockHttpRequest.side_effect = mockrequest

        client.request(uri, 'GET')
test_oauth.py 文件源码 项目:OneClickDTU 作者: satwikkansal 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_multiple_values_for_a_key(self, mockReqConstructor, mockHttpRequest):
        client = oauth.Client(self.consumer, None)

        request = oauth.Request("GET", "http://example.com/fetch.php", parameters={'multi': ['1', '2']})
        mockReqConstructor.return_value = request

        client.request('http://whatever', 'POST', body='multi=1&multi=2')

        self.assertEqual(mockReqConstructor.call_count, 1)
        self.assertEqual(mockReqConstructor.call_args[1]['parameters'], {'multi': ['1', '2']})

        self.assertTrue('multi=1' in mockHttpRequest.call_args[1]['body'])
        self.assertTrue('multi=2' in mockHttpRequest.call_args[1]['body'])
oauth10a_account.py 文件源码 项目:rekall-agent-server 作者: rekall-innovations 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def accessToken(self):
        """Return the access token generated by the authenticating server.

        If token is already in the session that one will be used.
        Otherwise the token is fetched from the auth server.

        """

        if self.session.access_token:
            # return the token (TODO: does it expire?)

            return self.session.access_token
        if self.session.request_token:
            # Exchange the request token with an authorization token.
            token = self.session.request_token
            self.session.request_token = None

            # Build an authorized client
            # OAuth1.0a put the verifier!
            token.set_verifier(self.request.vars.oauth_verifier)
            client = oauth.Client(self.consumer, token)

            resp, content = client.request(self.access_token_url, "POST")
            if str(resp['status']) != '200':
                self.session.request_token = None
                self.globals['redirect'](self.globals[
                                         'URL'](f='user', args='logout'))

            self.session.access_token = oauth.Token.from_string(content)

            return self.session.access_token

        self.session.access_token = None
        return None
oauth10a_account.py 文件源码 项目:rekall-agent-server 作者: rekall-innovations 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __oauth_login(self, next):
        '''This method redirects the user to the authenticating form
        on authentication server if the authentication code
        and the authentication token are not available to the
        application yet.

        Once the authentication code has been received this method is
        called to set the access token into the session by calling
        accessToken()
        '''

        if not self.accessToken():
            # setup the client
            client = oauth.Client(self.consumer, None, timeout=self.socket_timeout)
            # Get a request token.
            # oauth_callback *is REQUIRED* for OAuth1.0a
            # putting it in the body seems to work.
            callback_url = self.__redirect_uri(next)
            data = urlencode(dict(oauth_callback=callback_url))
            resp, content = client.request(self.token_url, "POST", body=data)
            if resp['status'] != '200':
                self.session.request_token = None
                self.globals['redirect'](self.globals[
                                         'URL'](f='user', args='logout'))

            # Store the request token in session.
            request_token = self.session.request_token = oauth.Token.from_string(content)

            # Redirect the user to the authentication URL and pass the callback url.
            data = urlencode(dict(oauth_token=request_token.key,
                                  oauth_callback=callback_url))
            auth_request_url = self.auth_url + '?' + data

            HTTP = self.globals['HTTP']

            raise HTTP(302,
                       "You are not authenticated: you are being redirected to the <a href='" + auth_request_url + "'> authentication server</a>",
                       Location=auth_request_url)

        return None
connect.py 文件源码 项目:social-vuln-scanner 作者: Betawolf 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__( self,  
                consumer_key,
                consumer_secret,
                user_token,
                user_secret,
                logger):
    logger.info("OAuth:\nConsumer Key:{}\nConsumer Secret:{}\nUser Token:{}\nUser Secret:{}".format(consumer_key,consumer_secret,user_token,user_secret))
    consumer = oauth.Consumer(consumer_key,consumer_secret)
    client = oauth.Client(consumer)
    access_token = oauth.Token(key=user_token,
                               secret=user_secret)
    client = oauth.Client(consumer,access_token,timeout=MediaConnection.timeout)
    self.client = client
    super().__init__(logger)
oauth10a_account.py 文件源码 项目:slugiot-client 作者: slugiot 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def accessToken(self):
        """Return the access token generated by the authenticating server.

        If token is already in the session that one will be used.
        Otherwise the token is fetched from the auth server.

        """

        if self.session.access_token:
            # return the token (TODO: does it expire?)

            return self.session.access_token
        if self.session.request_token:
            # Exchange the request token with an authorization token.
            token = self.session.request_token
            self.session.request_token = None

            # Build an authorized client
            # OAuth1.0a put the verifier!
            token.set_verifier(self.request.vars.oauth_verifier)
            client = oauth.Client(self.consumer, token)

            resp, content = client.request(self.access_token_url, "POST")
            if str(resp['status']) != '200':
                self.session.request_token = None
                self.globals['redirect'](self.globals[
                                         'URL'](f='user', args='logout'))

            self.session.access_token = oauth.Token.from_string(content)

            return self.session.access_token

        self.session.access_token = None
        return None
oauth10a_account.py 文件源码 项目:slugiot-client 作者: slugiot 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __oauth_login(self, next):
        '''This method redirects the user to the authenticating form
        on authentication server if the authentication code
        and the authentication token are not available to the
        application yet.

        Once the authentication code has been received this method is
        called to set the access token into the session by calling
        accessToken()
        '''

        if not self.accessToken():
            # setup the client
            client = oauth.Client(self.consumer, None, timeout=self.socket_timeout)
            # Get a request token.
            # oauth_callback *is REQUIRED* for OAuth1.0a
            # putting it in the body seems to work.
            callback_url = self.__redirect_uri(next)
            data = urlencode(dict(oauth_callback=callback_url))
            resp, content = client.request(self.token_url, "POST", body=data)
            if resp['status'] != '200':
                self.session.request_token = None
                self.globals['redirect'](self.globals[
                                         'URL'](f='user', args='logout'))

            # Store the request token in session.
            request_token = self.session.request_token = oauth.Token.from_string(content)

            # Redirect the user to the authentication URL and pass the callback url.
            data = urlencode(dict(oauth_token=request_token.key,
                                  oauth_callback=callback_url))
            auth_request_url = self.auth_url + '?' + data

            HTTP = self.globals['HTTP']

            raise HTTP(302,
                       "You are not authenticated: you are being redirected to the <a href='" + auth_request_url + "'> authentication server</a>",
                       Location=auth_request_url)

        return None
auth_setup.py 文件源码 项目:dancedeets-monorepo 作者: mikelambert 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def twitter_oauth1(user_id, token_nickname, country_filter):
    consumer = oauth.Consumer(auth.consumer_key, auth.consumer_secret)
    client = oauth.Client(consumer)

    # Step 1: Get a request token. This is a temporary token that is used for
    # having the user authorize an access token and to sign the request to obtain
    # said access token.

    resp, content = client.request(request_token_url, "GET")
    if resp['status'] != '200':
        raise Exception("Invalid response %s." % resp['status'])

    request_token = dict(urlparse.parse_qsl(content))

    auth_tokens = db.OAuthToken.query(
        db.OAuthToken.user_id == user_id, db.OAuthToken.token_nickname == token_nickname, db.OAuthToken.application == db.APP_TWITTER
    ).fetch(1)
    if auth_tokens:
        auth_token = auth_tokens[0]
    else:
        auth_token = db.OAuthToken()
    auth_token.user_id = user_id
    auth_token.token_nickname = token_nickname
    auth_token.application = db.APP_TWITTER
    auth_token.temp_oauth_token = request_token['oauth_token']
    auth_token.temp_oauth_token_secret = request_token['oauth_token_secret']
    if country_filter:
        auth_token.country_filters += country_filter.upper()
    auth_token.put()

    # Step 2: Redirect to the provider. Since this is a CLI script we do not
    # redirect. In a web application you would redirect the user to the URL
    # below.

    return "%s?oauth_token=%s" % (authorize_url, request_token['oauth_token'])


# user comes to:
# /sign-in-with-twitter/?
#        oauth_token=NPcudxy0yU5T3tBzho7iCotZ3cnetKwcTIRlX0iwRl0&
#        oauth_verifier=uw7NjWHT6OJ1MpJOXsHfNxoAhPKpgI8BlYDhxEjIBY
oauth7digital.py 文件源码 项目:MusicGenreClassification 作者: mlachmish 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def request_2legged(url, http_method="GET"):
    client = oauth.Client(_consumer())
    response, content = client.request(
        url,
        headers = {"Content-Type":"application/x-www-form-urlencoded"},
        body="country=%s" % api_settings.country,
        method = http_method
    )
    return response, content
oauth7digital.py 文件源码 项目:MusicGenreClassification 作者: mlachmish 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def request_access_token(token):
    client = oauth.Client(_consumer(), token=token)
    response, content = client.request(
            ACCESS_TOKEN_URL,
            headers={"Content-Type":"application/x-www-form-urlencoded"}
    )

    return _token_from_response_content(content)
oauth7digital.py 文件源码 项目:MusicGenreClassification 作者: mlachmish 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def request_3legged(url, access_token, http_method="GET", body=''):
    ''' Once you have an access_token authorized by a customer,
        execute a request on their behalf
    '''
    client = oauth.Client(_consumer(), token=access_token)
    response = client.request(
            url,
            headers={"Content-Type":"application/x-www-form-urlencoded"},
            method=http_method,
            body=body
    )

    return response
oauth7digital.py 文件源码 项目:MusicGenreClassification 作者: mlachmish 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def request_2legged(url, http_method="GET"):
    client = oauth.Client(_consumer())
    response, content = client.request(
        url,
        headers = {"Content-Type":"application/x-www-form-urlencoded"},
        body="country=%s" % api_settings.country,
        method = http_method
    )
    return response, content
oauth7digital.py 文件源码 项目:MusicGenreClassification 作者: mlachmish 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def request_access_token(token):
    client = oauth.Client(_consumer(), token=token)
    response, content = client.request(
            ACCESS_TOKEN_URL,
            headers={"Content-Type":"application/x-www-form-urlencoded"}
    )

    return _token_from_response_content(content)
Naive_Bayes.py 文件源码 项目:Twitter-Sentiment-Analyzer 作者: ishpreet-singh 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def oauthReq(self, url, http_method="GET", post_body=None, http_headers=None):
        config = self.parseConfig()
        consumer = oauth.Consumer(key=config.get('consumer_key'), secret=config.get('consumer_secret'))
        token = oauth.Token(key=config.get('access_token'), secret=config.get('access_token_secret'))
        client = oauth.Client(consumer, token)

        resp, content = client.request(
            url,
            method=http_method,
            body=post_body or '',
            headers=http_headers
        )
        return content


问题


面经


文章

微信
公众号

扫码关注公众号