python类Consumer()的实例源码

test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_no_version(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        self.consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        self.token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = self.token.key
        params['oauth_consumer_key'] = self.consumer.key
        self.request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        self.request.sign_request(signature_method, self.consumer, self.token)

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        parameters = server.verify_request(self.request, self.consumer,
            self.token)
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_invalid_version(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': '222.9922',
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['foo','bar'],
            'foo': 59
        }

        consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = token.key
        params['oauth_consumer_key'] = consumer.key
        request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        request.sign_request(signature_method, consumer, token)

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_invalid_signature_method(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': '1.0',
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = token.key
        params['oauth_consumer_key'] = consumer.key
        request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = SignatureMethod_Bad()
        request.sign_request(signature_method, consumer, token)

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        self.assertRaises(oauth.Error, server.verify_request, request,
            consumer, token)
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_missing_signature(self):
        url = "http://sp.example.com/"

        params = {
            'oauth_version': '1.0',
            'oauth_nonce': "4572616e48616d6d65724c61686176",
            'oauth_timestamp': int(time.time()),
            'bar': 'blerg',
            'multi': ['FOO','BAR'],
            'foo': 59
        }

        consumer = oauth.Consumer(key="consumer-key",
            secret="consumer-secret")
        token = oauth.Token(key="token-key", secret="token-secret")

        params['oauth_token'] = token.key
        params['oauth_consumer_key'] = consumer.key
        request = oauth.Request(method="GET", url=url, parameters=params)

        signature_method = oauth.SignatureMethod_HMAC_SHA1()
        request.sign_request(signature_method, consumer, token)
        del request['oauth_signature']

        server = oauth.Server()
        server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1())

        self.assertRaises(oauth.MissingSignature, server.verify_request,
            request, consumer, token)


# Request Token: http://oauth-sandbox.sevengoslings.net/request_token
# Auth: http://oauth-sandbox.sevengoslings.net/authorize
# Access Token: http://oauth-sandbox.sevengoslings.net/access_token
# Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged
# Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged
# Key: bd37aed57e15df53
# Secret: 0e9e6413a9ef49510a4f68ed02cd
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setUp(self):
        self.consumer = oauth.Consumer(key=self.consumer_key,
            secret=self.consumer_secret)

        self.body = {
            'foo': 'bar',
            'bar': 'foo',
            'multi': ['FOO','BAR'],
            'blah': 599999
        }
test_oauth.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_init_passes_kwargs_to_httplib2(self):
        class Blah():
            pass

        consumer = oauth.Consumer('token', 'secret')

        # httplib2 options
        client = oauth.Client(consumer, None, cache='.cache', timeout=3, disable_ssl_certificate_validation=True)
        self.assertNotEqual(client.cache, None)
        self.assertEqual(client.timeout, 3)
oauth1_authenticator.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(
        self,
        consumer_key,
        consumer_secret,
        token,
        token_secret
    ):
        self.consumer = oauth2.Consumer(consumer_key, consumer_secret)
        self.token = oauth2.Token(token, token_secret)
oauth10a_account.py 文件源码 项目:true_review_web2py 作者: lucadealfaro 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, g, client_id, client_secret, auth_url, token_url, access_token_url, socket_timeout=60):
        self.globals = g
        self.client_id = client_id
        self.client_secret = client_secret
        self.code = None
        self.request = current.request
        self.session = current.session
        self.auth_url = auth_url
        self.token_url = token_url
        self.access_token_url = access_token_url
        self.socket_timeout = socket_timeout

        # consumer init
        self.consumer = oauth.Consumer(self.client_id, self.client_secret)
oauth10a_account.py 文件源码 项目:spc 作者: whbrewer 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, g, client_id, client_secret, auth_url, token_url, access_token_url, socket_timeout=60):
        self.globals = g
        self.client_id = client_id
        self.client_secret = client_secret
        self.code = None
        self.request = current.request
        self.session = current.session
        self.auth_url = auth_url
        self.token_url = token_url
        self.access_token_url = access_token_url
        self.socket_timeout = socket_timeout

        # consumer init
        self.consumer = oauth.Consumer(self.client_id, self.client_secret)
lti.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def post_replace_result(
        self, score: str, result_data: t.Mapping[str, str] = None
    ) -> 'OutcomeResponse':
        '''
        POSTs the given score to the Tool Consumer with a replaceResult.

        OPTIONAL:
            result_data must be a dictionary
            Note: ONLY ONE of these values can be in the dict at a time,
            due to the Canvas specification.

        :param str text: text
        :param str url: url
        :param str launchUrl: The lti launch url
        '''
        self.operation = REPLACE_REQUEST
        self.score = score
        self.result_data = result_data
        if result_data is not None:
            if len(result_data) > 1:
                error_msg = (
                    'Dictionary result_data can only have one entry. '
                    '{0} entries were found.'.format(len(result_data))
                )
                raise ValueError(error_msg)
            elif any(a in result_data for a in ['url', 'text', 'launchUrl']):
                return self.post_outcome_request()
            else:
                error_msg = (
                    'Dictionary result_data can only have the key '
                    '"text" or the key "url".'
                )
                raise ValueError(error_msg)
        else:
            return self.post_outcome_request()
lti.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def post_delete_result(self) -> 'OutcomeResponse':
        '''
        POSTs a deleteRequest to the Tool Consumer.
        '''
        self.operation = DELETE_REQUEST
        return self.post_outcome_request()
lti.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def post_read_result(self) -> 'OutcomeResponse':
        '''
        POSTS a readResult to the Tool Consumer.
        '''
        self.operation = READ_REQUEST
        return self.post_outcome_request()
auth.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, key: str, secret: str) -> None:
        super(RequestValidatorMixin, self).__init__()
        self.consumer_key = key
        self.consumer_secret = secret

        self.oauth_server = oauth2.Server()
        signature_method = oauth2.SignatureMethod_HMAC_SHA1()
        self.oauth_server.add_signature_method(signature_method)
        self.oauth_consumer = oauth2.Consumer(
            self.consumer_key, self.consumer_secret
        )
twitter.py 文件源码 项目:socialauth 作者: emilyhorsman 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, request_url, params, token_secret, token_cookie):
        self.request_url  = request_url
        self.params       = params
        self.token_secret = token_secret
        self.token_cookie = token_cookie

        self.status             = False
        self.user_id            = None
        self.user_name          = None

        self.set_token_cookie   = None
        self.oauth_verifier     = params.get('oauth_verifier', None)
        self.oauth_token        = params.get('oauth_token', None)
        self.oauth_token_secret = None

        self.consumer_key = os.environ.get('TWITTER_CONSUMER_KEY', None)
        if not self.consumer_key:
            raise Error('No TWITTER_CONSUMER_KEY environment value')

        self.consumer_secret = os.environ.get('TWITTER_CONSUMER_SECRET', None)
        if not self.consumer_secret:
            raise Error('No TWITTER_CONSUMER_SECRET environment value')

        self.consumer = oauth2.Consumer(
            key       = self.consumer_key,
            secret    = self.consumer_secret)

        if params.get('login') == 'start':
            self.start()
        elif self.oauth_verifier and self.oauth_token:
            self.finish()
        else:
            raise InvalidUsage('Invalid request')
oauth10a_account.py 文件源码 项目:Problematica-public 作者: TechMaz 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, g, client_id, client_secret, auth_url, token_url, access_token_url, socket_timeout=60):
        self.globals = g
        self.client_id = client_id
        self.client_secret = client_secret
        self.code = None
        self.request = current.request
        self.session = current.session
        self.auth_url = auth_url
        self.token_url = token_url
        self.access_token_url = access_token_url
        self.socket_timeout = socket_timeout

        # consumer init
        self.consumer = oauth.Consumer(self.client_id, self.client_secret)
twitter.py 文件源码 项目:TwitterCompetitionBot 作者: BenAllenUK 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def SetCredentials(self,
                       consumer_key,
                       consumer_secret,
                       access_token_key=None,
                       access_token_secret=None):
        '''Set the consumer_key and consumer_secret for this instance

        Args:
          consumer_key:
            The consumer_key of the twitter account.
          consumer_secret:
            The consumer_secret for the twitter account.
          access_token_key:
            The oAuth access token key value you retrieved
            from running get_access_token.py.
          access_token_secret:
            The oAuth access token's secret, also retrieved
            from the get_access_token.py run.
        '''
        self._consumer_key        = consumer_key
        self._consumer_secret     = consumer_secret
        self._access_token_key    = access_token_key
        self._access_token_secret = access_token_secret
        self._oauth_consumer      = None

        if consumer_key is not None and consumer_secret is not None and \
                        access_token_key is not None and access_token_secret is not None:
            self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT()
            self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()

            self._oauth_token    = oauth.Token(key=access_token_key, secret=access_token_secret)
            self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
twitter.py 文件源码 项目:gif-disco 作者: futurice 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def SetCredentials(self,
                     consumer_key,
                     consumer_secret,
                     access_token_key=None,
                     access_token_secret=None):
    '''Set the consumer_key and consumer_secret for this instance

    Args:
      consumer_key:
        The consumer_key of the twitter account.
      consumer_secret:
        The consumer_secret for the twitter account.
      access_token_key:
        The oAuth access token key value you retrieved
        from running get_access_token.py.
      access_token_secret:
        The oAuth access token's secret, also retrieved
        from the get_access_token.py run.
    '''
    self._consumer_key        = consumer_key
    self._consumer_secret     = consumer_secret
    self._access_token_key    = access_token_key
    self._access_token_secret = access_token_secret
    self._oauth_consumer      = None

    if consumer_key is not None and consumer_secret is not None and \
       access_token_key is not None and access_token_secret is not None:
      self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT()
      self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()

      self._oauth_token    = oauth.Token(key=access_token_key, secret=access_token_secret)
      self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
smtp.py 文件源码 项目:OneClickDTU 作者: satwikkansal 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        self.docmd('AUTH', 'XOAUTH %s' % \
            base64.b64encode(oauth2.build_xoauth_string(url, consumer, token)))


问题


面经


文章

微信
公众号

扫码关注公众号