python类Consumer()的实例源码

imapclient.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def oauth_login(self, url, oauth_token, oauth_token_secret,
                    consumer_key='anonymous', consumer_secret='anonymous'):
        """Authenticate using the OAUTH method.

        This only works with IMAP servers that support OAUTH (e.g. Gmail).
        """
        if oauth_module:
            token = oauth_module.Token(oauth_token, oauth_token_secret)
            consumer = oauth_module.Consumer(consumer_key, consumer_secret)
            xoauth_callable = lambda x: oauth_module.build_xoauth_string(url,
                                                                         consumer,
                                                                         token)
            return self._command_and_check('authenticate', 'XOAUTH',
                                           xoauth_callable, unpack=True)
        else:
            raise self.Error(
                'The optional oauth2 package is needed for OAUTH authentication')
__init__.py 文件源码 项目:yahoo-fantasy-football-metrics 作者: uberfastman 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __two_legged_request(self, parameters=None, method=None):
        """Sign a request for two-legged authentication"""
        params = self.get_base_params()
        if parameters:
            params.update(parameters)
        url = self.endpoint
        yql_logger.debug("params: %s", params)
        yql_logger.debug("endpoint_url: %s", url)
        if not method:
            method = "GET"

        consumer = oauth.Consumer(self.api_key, self.secret)
        request = oauth.Request(method=method, url=url, parameters=params)
        sig = self.get_signature(url)
        yql_logger.debug("signature: %s", sig)
        request.sign_request(sig, consumer, None)
        return request
__init__.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __two_legged_request(self, resource_url, parameters=None, method=None):
        """Sign a request for two-legged authentication"""

        params = self.get_base_params()
        if parameters:
            params.update(parameters)

        yql_logger.debug("params: %s", params)
        yql_logger.debug("resource_url: %s", resource_url)
        if not method:
            method = "GET"

        consumer = oauth.Consumer(self.api_key, self.secret)
        request = oauth.Request(method=method, url=resource_url,
                                                        parameters=params)
        request.sign_request(self.hmac_sha1_signature, consumer, None)
        return request
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_set_signature_method(self):
        consumer = oauth.Consumer('key', 'secret')
        client = oauth.Client(consumer)

        class Blah:
            pass

        try:
            client.set_signature_method(Blah())
            self.fail("Client.set_signature_method() accepted invalid method.")
        except ValueError:
            pass

        m = oauth.SignatureMethod_HMAC_SHA1()
        client.set_signature_method(m)
        self.assertEqual(m, client.method)
trading.py 文件源码 项目:trump2cash 作者: maxbbraun 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def make_request(self, url, method="GET", body="", headers=None):
        """Makes a request to the TradeKing API."""

        consumer = Consumer(key=TRADEKING_CONSUMER_KEY,
                            secret=TRADEKING_CONSUMER_SECRET)
        token = Token(key=TRADEKING_ACCESS_TOKEN,
                      secret=TRADEKING_ACCESS_TOKEN_SECRET)
        client = Client(consumer, token)

        self.logs.debug("TradeKing request: %s %s %s %s" %
                        (url, method, body, headers))
        response, content = client.request(url, method=method, body=body,
                                           headers=headers)
        self.logs.debug("TradeKing response: %s %s" % (response, content))

        try:
            return loads(content)
        except ValueError:
            self.logs.error("Failed to decode JSON response: %s" % content)
            return None
oauth_credentials.py 文件源码 项目:fluiddb 作者: fluidinfo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def verifySignature(self, secret):
        """See L{IOAuthCredentials#verifySignature}."""
        consumer = Consumer(key=self.consumerKey, secret=secret)
        oauthRequest = Request.from_request(
            self.method, self.url, headers=self.headers,
            query_string=self.arguments)

        # verify the request has been oauth authorized, we only support
        # HMAC-SHA1, reject OAuth signatures if they use a different method
        if self.signatureMethod != 'HMAC-SHA1':
            raise NotImplementedError(
                'Unknown signature method: %s' % self.signatureMethod)
        signatureMethod = SignatureMethod_HMAC_SHA1()
        result = signatureMethod.check(oauthRequest, consumer, None,
                                       self.signature)
        return result
views.py 文件源码 项目:ewe_ebooks 作者: jaymcgrath 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_request_token(self):
        """
        Function for generating initial request token for three-legged oauth
        Takes no input, returns a request_token dict with two keys, oauth_token and oauth_token_secret
        """

        callback_url = self.request.build_absolute_uri(reverse('bot-authorize', args=(self.object.pk, )))

        consumer = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET)
        client = oauth.Client(consumer)
        resp, content = client.request(self.REQUEST_TOKEN_URL, "POST",
                                       body=urllib.parse.urlencode({'oauth_callback': callback_url}))

        if resp['status'] != '200':
            raise Exception("Invalid response %s." % resp['status'])

        request_token = dict(urllib.parse.parse_qsl(content))

        # urllib returns bytes, which will need to be decoded using the string.decode() method before use
        request_token = {key.decode(): value.decode() for key, value in request_token.items()}

        # Return the token dict containing token and secret
        return request_token
views.py 文件源码 项目:ewe_ebooks 作者: jaymcgrath 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_access_token(self, oauth_token, oauth_token_secret, oauth_verifier):
        """
        Func for final leg of three-legged oauth,
        takes token and secret returned in oauth_callback GET dict and
         exchanges them for the permanent access token and secret
        returns an access_token dict with two keys, oauth_token and oauth_token_secret
        """
        token = oauth.Token(oauth_token, oauth_token_secret)
        token.set_verifier(oauth_verifier)
        consumer = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET)
        client = oauth.Client(consumer, token)

        # Now we fire the request at access token url instead of request token
        resp, content = client.request(self.ACCESS_TOKEN_URL, "POST")

        if resp['status'] != '200':
            raise Exception("Invalid response %s." % resp['status'])

        access_token = dict(urllib.parse.parse_qsl(content))

        # urllib returns bytes, which will need to be decoded using the string.decode() method before use
        access_token = {key.decode(): value.decode() for key, value in access_token.items()}

        # Return the token dict containing token and secret
        return access_token
test_oauth.py 文件源码 项目:OneClickDTU 作者: satwikkansal 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_set_signature_method(self):
        consumer = oauth.Consumer('key', 'secret')
        client = oauth.Client(consumer)

        class Blah:
            pass

        try:
            client.set_signature_method(Blah())
            self.fail("Client.set_signature_method() accepted invalid method.")
        except ValueError:
            pass

        m = oauth.SignatureMethod_HMAC_SHA1()
        client.set_signature_method(m)
        self.assertEqual(m, client.method)
twitter_helper.py 文件源码 项目:oasis 作者: mhfowler 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, access_token_key, access_token_secret, consumer_key, consumer_secret):
        self.access_token_key = access_token_key
        self.access_token_secret = access_token_secret
        self.consumer_key = consumer_key
        self.consumer_secret = consumer_secret

        _debug = 0

        self.oauth_token = oauth.Token(key=self.access_token_key, secret=self.access_token_secret)
        self.oauth_consumer = oauth.Consumer(key=self.consumer_key, secret=self.consumer_secret)

        self.signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()


        self.http_handler  = urllib.HTTPHandler(debuglevel=_debug)
        self.https_handler = urllib.HTTPSHandler(debuglevel=_debug)
request.py 文件源码 项目:TumblrVideos 作者: moedje 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, consumer_key, consumer_secret="", oauth_token="", oauth_secret="", host="https://api.tumblr.com",
                 proxy_url=None):
        self.host = host
        self.consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
        self.token = oauth.Token(key=oauth_token, secret=oauth_secret)
        self.proxy_url = proxy_url
        if proxy_url:
            print("Generating Proxy From proxy_url")
            self.proxy_info = httplib2.proxy_info_from_url("https://" + proxy_url, 'http')
            self.proxy_info.proxy_rdns = True
            # uri = urlparse(proxy_url)
            # self.proxy_info = ProxyInfo(socks.PROXY_TYPE_HTTP,uri.hostname,uri.port,proxy_rdns=True)
        else:
            print("Generating proxy from ENV")
            proxy_url = os.environ.get('HTTPS_PROXY', None)
            if proxy_url:
                uri = urlparse(proxy_url)
                self.proxy_info = ProxyInfo(socks.PROXY_TYPE_HTTP, uri.hostname, uri.port, proxy_rdns=True)
            else:
                self.proxy_info = None
auth_setup.py 文件源码 项目:dancedeets-monorepo 作者: mikelambert 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def twitter_oauth2(oauth_token, oauth_verifier):
    auth_tokens = db.OAuthToken.query(db.OAuthToken.temp_oauth_token == oauth_token, db.OAuthToken.application == db.APP_TWITTER).fetch(1)
    if not auth_tokens:
        return None
    auth_token = auth_tokens[0]
    # Step 3: Once the consumer has redirected the user back to the oauth_callback
    # URL you can request the access token the user has approved. You use the
    # request token to sign this request. After this is done you throw away the
    # request token and use the access token returned. You should store this
    # access token somewhere safe, like a database, for future use.
    token = oauth.Token(oauth_token, auth_token.temp_oauth_token_secret)
    token.set_verifier(oauth_verifier)
    consumer = oauth.Consumer(auth.consumer_key, auth.consumer_secret)
    client = oauth.Client(consumer, token)

    resp, content = client.request(access_token_url, "POST")
    access_token = dict(urlparse.parse_qsl(content))
    auth_token.oauth_token = access_token['oauth_token']
    auth_token.oauth_token_secret = access_token['oauth_token_secret']
    auth_token.valid_token = True
    auth_token.time_between_posts = 5 * 60  # every 5 minutes
    auth_token.put()
    return auth_token
__init__.py 文件源码 项目:splitwise 作者: namaggarwal 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self,consumer_key,consumer_secret,access_token=None):
        """ Initializes the splitwise class. Sets consumer and access token

        Args:
            consumer_key (str) : Consumer Key provided by Spliwise
            consumer_secret (str): Consumer Secret provided by Splitwise
            access_token (:obj: `dict`) Access Token is a combination of oauth_token and oauth_token_secret

        Returns:
            A Splitwise Object
        """
        self.consumer = oauth.Consumer(consumer_key, consumer_secret)

        #If access token is present then set the Access token
        if access_token:
            self.setAccessToken(access_token)
twitter_api.py 文件源码 项目:Build-a-Bot 作者: ShanTulshi 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_tweets(twitter_handle):
    CONSUMER_KEY = 'YurZ2aWgVEYVhE5c60XyWRGG0'
    CONSUMER_SECRET = 'TIaSl3xnVTNUzHrwz84lOycTYkwe9l1NocwkB4hGbd2ngMufn6'
    ACCESS_KEY = '564268368-tRdDLN3O9EKzlaY28NzHCgNsYJX59YRngv3qjjJh'
    ACCESS_SECRET = 'VUjRU2ftlmnUdqDtppMiV6LLqT83ZbKDDUjcpWtrT1PG4'

    consumer = oauth.Consumer(key=CONSUMER_KEY, secret=CONSUMER_SECRET)
    access_token = oauth.Token(key=ACCESS_KEY, secret=ACCESS_SECRET)
    client = oauth.Client(consumer, access_token)

    endpoint = "https://api.twitter.com/1.1/search/tweets.json?q=from%3A" + twitter_handle + "&result_type=all&count=100"
    response, data = client.request(endpoint)

    tweet_data = json.loads(data.decode('utf-8'))
    tweets = []
    for tweet in tweet_data['statuses']:
        tweets.append(re.sub(r'https:\/\/t\.co\/.{10}', '', tweet['text'] + ' \n'))
    return tweets

#example
# print(get_tweets("Cmdr_Hadfield"))
__init__.py 文件源码 项目:yahoo-fantasy-football-metrics 作者: uberfastman 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, api_key, shared_secret, httplib2_inst=None):
        """Override init to add consumer"""
        super(ThreeLegged, self).__init__(
                                    api_key, shared_secret, httplib2_inst)
        self.consumer = oauth.Consumer(self.api_key, self.secret)
oauth10a_account.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, g, client_id, client_secret, auth_url, token_url, access_token_url, socket_timeout=60):
        self.globals = g
        self.client_id = client_id
        self.client_secret = client_secret
        self.code = None
        self.request = current.request
        self.session = current.session
        self.auth_url = auth_url
        self.token_url = token_url
        self.access_token_url = access_token_url
        self.socket_timeout = socket_timeout

        # consumer init
        self.consumer = oauth.Consumer(self.client_id, self.client_secret)
send_grades.py 文件源码 项目:sga-lti 作者: mitodl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _post_patched_request(lti_key, secret, body, url, method, content_type):  # pylint: disable=too-many-arguments
    """
    Authorization header needs to be capitalized for some LTI clients
    this function ensures that header is capitalized

    :param body: body of the call
    :param client: OAuth Client
    :param url: outcome url
    :return: response
    """

    consumer = oauth2.Consumer(key=lti_key, secret=secret)
    client = oauth2.Client(consumer)

    import httplib2

    http = httplib2.Http
    # pylint: disable=protected-access
    normalize = http._normalize_headers

    def my_normalize(self, headers):
        """ This function patches Authorization header """
        ret = normalize(self, headers)
        if 'authorization' in ret:
            ret['Authorization'] = ret.pop('authorization')
        return ret

    http._normalize_headers = my_normalize
    monkey_patch_function = normalize
    response, content = client.request(
        url,
        method,
        body=body.encode("utf8"),
        headers={'Content-Type': content_type})

    http = httplib2.Http
    # pylint: disable=protected-access
    http._normalize_headers = monkey_patch_function

    return response, content
smtp.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        self.docmd('AUTH', 'XOAUTH %s' % \
            base64.b64encode(oauth2.build_xoauth_string(url, consumer, token)))
imap.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
            lambda x: oauth2.build_xoauth_string(url, consumer, token))
__init__.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, api_key, shared_secret, httplib2_inst=None):
        """Override init to add consumer"""
        super(ThreeLegged, self).__init__(
                                    api_key, shared_secret, httplib2_inst)

        self.scheme = HTTP_SCHEME
        self.endpoint = PRIVATE_ENDPOINT
        self.consumer = oauth.Consumer(self.api_key, self.secret)
smtp.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        self.docmd('AUTH', 'XOAUTH %s' % \
            base64.b64encode(oauth2.build_xoauth_string(url, consumer, token)))
imap.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
            lambda x: oauth2.build_xoauth_string(url, consumer, token))
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setUp(self):
        self.key = 'my-key'
        self.secret = 'my-secret'
        self.consumer = oauth.Consumer(key=self.key, secret=self.secret)
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_basic(self):
        self.assertRaises(ValueError, lambda: oauth.Consumer(None, None))
        self.assertRaises(ValueError, lambda: oauth.Consumer('asf', None))
        self.assertRaises(ValueError, lambda: oauth.Consumer(None, 'dasf'))
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_no_url_set(self):
        consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret')
        token = oauth.Token('my_key', 'my_secret')
        request = oauth.Request()
        self.assertRaises(ValueError,
                          request.sign_request,
                          oauth.SignatureMethod_HMAC_SHA1(), consumer, token)
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_signature_base_unicode_nonascii(self):
        consumer = oauth.Consumer('consumer_token', 'consumer_secret')

        url = u('http://api.simplegeo.com:80/1.0/places/address.json'
                '?q=monkeys&category=animal'
                '&address=41+Decatur+St,+San+Francisc') + _U2766 + u(',+CA')
        req = oauth.Request("GET", url)
        self.assertReallyEqual(
            req.normalized_url,
            u('http://api.simplegeo.com/1.0/places/address.json'))
        req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
        self.assertReallyEqual(
            req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_signature_base_string_bytes_nonascii_nonutf8(self):
        consumer = oauth.Consumer('consumer_token', 'consumer_secret')

        url = (b'http://api.simplegeo.com:80/1.0/places/address.json'
               b'?q=monkeys&category=animal'
               b'&address=41+Decatur+St,+San+Francisc') + _B2766 + b',+CA'
        req = oauth.Request("GET", url)
        self.assertReallyEqual(
            req.normalized_url,
            u('http://api.simplegeo.com/1.0/places/address.json'))
        req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
        self.assertReallyEqual( #XXX
            req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_signature_base_bytes_nonascii_nonutf8_urlencoded(self):
        consumer = oauth.Consumer('consumer_token', 'consumer_secret')

        url = (b'http://api.simplegeo.com:80/1.0/places/address.json'
               b'?q=monkeys&category=animal'
               b'&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA')
        req = oauth.Request("GET", url)
        self.assertReallyEqual(
            req.normalized_url,
            u('http://api.simplegeo.com/1.0/places/address.json'))
        req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
        self.assertReallyEqual(
            req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_signature_base_unicode_nonascii_nonutf8_url_encoded(self):
        consumer = oauth.Consumer('consumer_token', 'consumer_secret')

        url = u('http://api.simplegeo.com:80/1.0/places/address.json'
                '?q=monkeys&category=animal'
                '&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA')
        req = oauth.Request("GET", url)
        self.assertReallyEqual(
            req.normalized_url,
            u('http://api.simplegeo.com/1.0/places/address.json'))
        req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None)
        self.assertReallyEqual(
            req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_from_consumer_and_token(self):
        url = "http://sp.example.com/"

        tok = oauth.Token(key="tok-test-key", secret="tok-test-secret")
        tok.set_verifier('this_is_a_test_verifier')
        con = oauth.Consumer(key="con-test-key", secret="con-test-secret")
        req = oauth.Request.from_consumer_and_token(con, token=tok,
            http_method="GET", http_url=url)

        self.assertEqual(req['oauth_token'], tok.key)
        self.assertEqual(req['oauth_consumer_key'], con.key)
        self.assertEqual(tok.verifier, req['oauth_verifier'])


问题


面经


文章

微信
公众号

扫码关注公众号