python类OAuth2Credentials()的实例源码

test_client.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_unicode_header_checks(self):
        access_token = u'foo'
        client_id = u'some_client_id'
        client_secret = u'cOuDdkfjxxnv+'
        refresh_token = u'1/0/a.df219fjls0'
        token_expiry = str(datetime.datetime.utcnow())
        token_uri = str(oauth2client.GOOGLE_TOKEN_URI)
        revoke_uri = str(oauth2client.GOOGLE_REVOKE_URI)
        user_agent = u'refresh_checker/1.0'
        credentials = client.OAuth2Credentials(
            access_token, client_id, client_secret, refresh_token,
            token_expiry, token_uri, user_agent, revoke_uri=revoke_uri)

        # First, test that we correctly encode basic objects, making sure
        # to include a bytes object. Note that oauth2client will normalize
        # everything to bytes, no matter what python version we're in.
        http = credentials.authorize(http_mock.HttpMock())
        headers = {u'foo': 3, b'bar': True, 'baz': b'abc'}
        cleaned_headers = {b'foo': b'3', b'bar': b'True', b'baz': b'abc'}
        transport.request(
            http, u'http://example.com', method=u'GET', headers=headers)
        for k, v in cleaned_headers.items():
            self.assertTrue(k in http.headers)
            self.assertEqual(v, http.headers[k])

        # Next, test that we do fail on unicode.
        unicode_str = six.unichr(40960) + 'abcd'
        with self.assertRaises(client.NonAsciiHeaderError):
            transport.request(
                http, u'http://example.com', method=u'GET',
                headers={u'foo': unicode_str})
test_client.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_no_unicode_in_request_params(self):
        access_token = u'foo'
        client_id = u'some_client_id'
        client_secret = u'cOuDdkfjxxnv+'
        refresh_token = u'1/0/a.df219fjls0'
        token_expiry = str(datetime.datetime.utcnow())
        token_uri = str(oauth2client.GOOGLE_TOKEN_URI)
        revoke_uri = str(oauth2client.GOOGLE_REVOKE_URI)
        user_agent = u'refresh_checker/1.0'
        credentials = client.OAuth2Credentials(
            access_token, client_id, client_secret, refresh_token,
            token_expiry, token_uri, user_agent, revoke_uri=revoke_uri)

        http = http_mock.HttpMock()
        http = credentials.authorize(http)
        transport.request(
            http, u'http://example.com', method=u'GET',
            headers={u'foo': u'bar'})
        for k, v in six.iteritems(http.headers):
            self.assertIsInstance(k, six.binary_type)
            self.assertIsInstance(v, six.binary_type)

        # Test again with unicode strings that can't simply be converted
        # to ASCII.
        with self.assertRaises(client.NonAsciiHeaderError):
            transport.request(
                http, u'http://example.com', method=u'GET',
                headers={u'foo': u'\N{COMET}'})

        self.credentials.token_response = 'foobar'
        instance = client.OAuth2Credentials.from_json(
            self.credentials.to_json())
        self.assertEqual('foobar', instance.token_response)
test_client.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test__expires_in_no_expiry(self):
        credentials = client.OAuth2Credentials(None, None, None, None,
                                               None, None, None)
        self.assertIsNone(credentials.token_expiry)
        self.assertIsNone(credentials._expires_in())
test_client.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test__expires_in_expired(self, utcnow):
        credentials = client.OAuth2Credentials(None, None, None, None,
                                               None, None, None)
        credentials.token_expiry = datetime.datetime.utcnow()
        now = credentials.token_expiry + datetime.timedelta(seconds=1)
        self.assertLess(credentials.token_expiry, now)
        utcnow.return_value = now
        self.assertEqual(credentials._expires_in(), 0)
        utcnow.assert_called_once_with()
test_client.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test__expires_in_not_expired(self, utcnow):
        credentials = client.OAuth2Credentials(None, None, None, None,
                                               None, None, None)
        credentials.token_expiry = datetime.datetime.utcnow()
        seconds = 1234
        now = credentials.token_expiry - datetime.timedelta(seconds=seconds)
        self.assertLess(now, credentials.token_expiry)
        utcnow.return_value = now
        self.assertEqual(credentials._expires_in(), seconds)
        utcnow.assert_called_once_with()
test_client.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _do_refresh_request_test_helper(self, response, content,
                                        error_msg, logger, gen_body,
                                        gen_headers, store=None):
        token_uri = 'http://token_uri'
        credentials = client.OAuth2Credentials(None, None, None, None,
                                               None, token_uri, None)
        credentials.store = store
        http = http_mock.HttpMock(headers=response, data=content)

        with self.assertRaises(
                client.HttpAccessTokenRefreshError) as exc_manager:
            credentials._do_refresh_request(http)

        self.assertEqual(exc_manager.exception.args, (error_msg,))
        self.assertEqual(exc_manager.exception.status, response.status)

        # Verify mocks.
        self.assertEqual(http.requests, 1)
        self.assertEqual(http.uri, token_uri)
        self.assertEqual(http.method, 'POST')
        self.assertEqual(http.body, gen_body.return_value)
        self.assertEqual(http.headers, gen_headers.return_value)

        call1 = mock.call('Refreshing access_token')
        failure_template = 'Failed to retrieve access token: %s'
        call2 = mock.call(failure_template, content)
        self.assertEqual(logger.info.mock_calls, [call1, call2])
        if store is not None:
            store.locked_put.assert_called_once_with(credentials)
test_client.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _do_revoke_test_helper(self, response, content,
                               error_msg, logger, store=None):
        credentials = client.OAuth2Credentials(
            None, None, None, None, None, None, None,
            revoke_uri=oauth2client.GOOGLE_REVOKE_URI)
        credentials.store = store

        http = http_mock.HttpMock(headers=response, data=content)
        token = u's3kr3tz'

        if response.status == http_client.OK:
            self.assertFalse(credentials.invalid)
            self.assertIsNone(credentials._do_revoke(http, token))
            self.assertTrue(credentials.invalid)
            if store is not None:
                store.delete.assert_called_once_with()
        else:
            self.assertFalse(credentials.invalid)
            with self.assertRaises(client.TokenRevokeError) as exc_manager:
                credentials._do_revoke(http, token)
            # Make sure invalid was not flipped on.
            self.assertFalse(credentials.invalid)
            self.assertEqual(exc_manager.exception.args, (error_msg,))
            if store is not None:
                store.delete.assert_not_called()

        revoke_uri = oauth2client.GOOGLE_REVOKE_URI + '?token=' + token

        # Verify mocks.
        self.assertEqual(http.requests, 1)
        self.assertEqual(http.uri, revoke_uri)
        self.assertEqual(http.method, 'GET')
        self.assertIsNone(http.body)
        self.assertIsNone(http.headers)

        logger.info.assert_called_once_with('Revoking token')
test_client.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _do_retrieve_scopes_test_helper(self, response, content,
                                        error_msg, logger, scopes=None):
        credentials = client.OAuth2Credentials(
            None, None, None, None, None, None, None,
            token_info_uri=oauth2client.GOOGLE_TOKEN_INFO_URI)
        http = http_mock.HttpMock(headers=response, data=content)
        token = u's3kr3tz'

        if response.status == http_client.OK:
            self.assertEqual(credentials.scopes, set())
            self.assertIsNone(
                credentials._do_retrieve_scopes(http, token))
            self.assertEqual(credentials.scopes, scopes)
        else:
            self.assertEqual(credentials.scopes, set())
            with self.assertRaises(client.Error) as exc_manager:
                credentials._do_retrieve_scopes(http, token)
            # Make sure scopes were not changed.
            self.assertEqual(credentials.scopes, set())
            self.assertEqual(exc_manager.exception.args, (error_msg,))

        token_uri = _helpers.update_query_params(
            oauth2client.GOOGLE_TOKEN_INFO_URI,
            {'fields': 'scope', 'access_token': token})

        # Verify mocks.
        self.assertEqual(http.requests, 1)
        assertUrisEqual(self, token_uri, http.uri)
        self.assertEqual(http.method, 'GET')
        self.assertIsNone(http.body)
        self.assertIsNone(http.headers)
        logger.info.assert_called_once_with('Refreshing scopes')
test_file.py 文件源码 项目:REMAP 作者: REMAPApp 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _create_test_credentials(self, client_id='some_client_id',
                                 expiration=None):
        access_token = 'foo'
        client_secret = 'cOuDdkfjxxnv+'
        refresh_token = '1/0/a.df219fjls0'
        token_expiry = expiration or datetime.datetime.utcnow()
        token_uri = 'https://www.google.com/accounts/o8/oauth2/token'
        user_agent = 'refresh_checker/1.0'

        credentials = client.OAuth2Credentials(
            access_token, client_id, client_secret,
            refresh_token, token_expiry, token_uri,
            user_agent)
        return credentials
base_repository_test.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_test_credential(self):
        access_token = 'foo'
        client_id = 'some_client_id'
        client_secret = 'cOuDdkfjxxnv+'
        refresh_token = '1/0/a.df219fjls0'
        token_expiry = datetime.datetime.utcnow()
        user_agent = ''
        return client.OAuth2Credentials(
            access_token, client_id, client_secret,
            refresh_token, token_expiry, oauth2client.GOOGLE_TOKEN_URI,
            user_agent, revoke_uri=oauth2client.GOOGLE_REVOKE_URI,
            scopes='foo', token_info_uri=oauth2client.GOOGLE_TOKEN_INFO_URI)
_base_repository.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _create_service_api(credentials, service_name, version, developer_key=None,
                        cache_discovery=False):
    """Builds and returns a cloud API service object.

    Args:
        credentials (OAuth2Credentials): Credentials that will be used to
            authenticate the API calls.
        service_name (str): The name of the API.
        version (str): The version of the API to use.
        developer_key (str): The api key to use to determine the project
            associated with the API call, most API services do not require
            this to be set.
        cache_discovery (bool): Whether or not to cache the discovery doc.

    Returns:
        object: A Resource object with methods for interacting with the service.
    """
    # The default logging of the discovery obj is very noisy in recent versions.
    # Lower the default logging level of just this module to WARNING unless
    # debug is enabled.
    if LOGGER.getEffectiveLevel() > logging.DEBUG:
        logging.getLogger(discovery.__name__).setLevel(logging.WARNING)

    discovery_kwargs = {
        'serviceName': service_name,
        'version': version,
        'developerKey': developer_key,
        'credentials': credentials}
    if SUPPORT_DISCOVERY_CACHE:
        discovery_kwargs['cache_discovery'] = cache_discovery
    return discovery.build(**discovery_kwargs)
gsheets.py 文件源码 项目:Tobo-Cogs 作者: Tobotimus 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, credentials: client.OAuth2Credentials):
        self.credentials = credentials
        self.session = ClientSession()
test_client.py 文件源码 项目:ddpclient 作者: insiight 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        self.client = Client(OAuth2Credentials('token', 'clientid', 'secret',
                                               'rtoken', datetime.datetime(
                                                   2038, 1, 1), 'uri', 'ua'))
test_user_list_selector.py 文件源码 项目:ddpclient 作者: insiight 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        self.client = Client(OAuth2Credentials('token', 'clientid', 'secret',
                                               'rtoken', datetime.datetime(
                                                   2038, 1, 1), 'uri', 'ua'))
        self.selector = UserListSelector()


问题


面经


文章

微信
公众号

扫码关注公众号