def test_unicode_header_checks(self):
access_token = u'foo'
client_id = u'some_client_id'
client_secret = u'cOuDdkfjxxnv+'
refresh_token = u'1/0/a.df219fjls0'
token_expiry = str(datetime.datetime.utcnow())
token_uri = str(oauth2client.GOOGLE_TOKEN_URI)
revoke_uri = str(oauth2client.GOOGLE_REVOKE_URI)
user_agent = u'refresh_checker/1.0'
credentials = client.OAuth2Credentials(
access_token, client_id, client_secret, refresh_token,
token_expiry, token_uri, user_agent, revoke_uri=revoke_uri)
# First, test that we correctly encode basic objects, making sure
# to include a bytes object. Note that oauth2client will normalize
# everything to bytes, no matter what python version we're in.
http = credentials.authorize(http_mock.HttpMock())
headers = {u'foo': 3, b'bar': True, 'baz': b'abc'}
cleaned_headers = {b'foo': b'3', b'bar': b'True', b'baz': b'abc'}
transport.request(
http, u'http://example.com', method=u'GET', headers=headers)
for k, v in cleaned_headers.items():
self.assertTrue(k in http.headers)
self.assertEqual(v, http.headers[k])
# Next, test that we do fail on unicode.
unicode_str = six.unichr(40960) + 'abcd'
with self.assertRaises(client.NonAsciiHeaderError):
transport.request(
http, u'http://example.com', method=u'GET',
headers={u'foo': unicode_str})
python类OAuth2Credentials()的实例源码
def test_no_unicode_in_request_params(self):
access_token = u'foo'
client_id = u'some_client_id'
client_secret = u'cOuDdkfjxxnv+'
refresh_token = u'1/0/a.df219fjls0'
token_expiry = str(datetime.datetime.utcnow())
token_uri = str(oauth2client.GOOGLE_TOKEN_URI)
revoke_uri = str(oauth2client.GOOGLE_REVOKE_URI)
user_agent = u'refresh_checker/1.0'
credentials = client.OAuth2Credentials(
access_token, client_id, client_secret, refresh_token,
token_expiry, token_uri, user_agent, revoke_uri=revoke_uri)
http = http_mock.HttpMock()
http = credentials.authorize(http)
transport.request(
http, u'http://example.com', method=u'GET',
headers={u'foo': u'bar'})
for k, v in six.iteritems(http.headers):
self.assertIsInstance(k, six.binary_type)
self.assertIsInstance(v, six.binary_type)
# Test again with unicode strings that can't simply be converted
# to ASCII.
with self.assertRaises(client.NonAsciiHeaderError):
transport.request(
http, u'http://example.com', method=u'GET',
headers={u'foo': u'\N{COMET}'})
self.credentials.token_response = 'foobar'
instance = client.OAuth2Credentials.from_json(
self.credentials.to_json())
self.assertEqual('foobar', instance.token_response)
def test__expires_in_no_expiry(self):
credentials = client.OAuth2Credentials(None, None, None, None,
None, None, None)
self.assertIsNone(credentials.token_expiry)
self.assertIsNone(credentials._expires_in())
def test__expires_in_expired(self, utcnow):
credentials = client.OAuth2Credentials(None, None, None, None,
None, None, None)
credentials.token_expiry = datetime.datetime.utcnow()
now = credentials.token_expiry + datetime.timedelta(seconds=1)
self.assertLess(credentials.token_expiry, now)
utcnow.return_value = now
self.assertEqual(credentials._expires_in(), 0)
utcnow.assert_called_once_with()
def test__expires_in_not_expired(self, utcnow):
credentials = client.OAuth2Credentials(None, None, None, None,
None, None, None)
credentials.token_expiry = datetime.datetime.utcnow()
seconds = 1234
now = credentials.token_expiry - datetime.timedelta(seconds=seconds)
self.assertLess(now, credentials.token_expiry)
utcnow.return_value = now
self.assertEqual(credentials._expires_in(), seconds)
utcnow.assert_called_once_with()
def _do_refresh_request_test_helper(self, response, content,
error_msg, logger, gen_body,
gen_headers, store=None):
token_uri = 'http://token_uri'
credentials = client.OAuth2Credentials(None, None, None, None,
None, token_uri, None)
credentials.store = store
http = http_mock.HttpMock(headers=response, data=content)
with self.assertRaises(
client.HttpAccessTokenRefreshError) as exc_manager:
credentials._do_refresh_request(http)
self.assertEqual(exc_manager.exception.args, (error_msg,))
self.assertEqual(exc_manager.exception.status, response.status)
# Verify mocks.
self.assertEqual(http.requests, 1)
self.assertEqual(http.uri, token_uri)
self.assertEqual(http.method, 'POST')
self.assertEqual(http.body, gen_body.return_value)
self.assertEqual(http.headers, gen_headers.return_value)
call1 = mock.call('Refreshing access_token')
failure_template = 'Failed to retrieve access token: %s'
call2 = mock.call(failure_template, content)
self.assertEqual(logger.info.mock_calls, [call1, call2])
if store is not None:
store.locked_put.assert_called_once_with(credentials)
def _do_revoke_test_helper(self, response, content,
error_msg, logger, store=None):
credentials = client.OAuth2Credentials(
None, None, None, None, None, None, None,
revoke_uri=oauth2client.GOOGLE_REVOKE_URI)
credentials.store = store
http = http_mock.HttpMock(headers=response, data=content)
token = u's3kr3tz'
if response.status == http_client.OK:
self.assertFalse(credentials.invalid)
self.assertIsNone(credentials._do_revoke(http, token))
self.assertTrue(credentials.invalid)
if store is not None:
store.delete.assert_called_once_with()
else:
self.assertFalse(credentials.invalid)
with self.assertRaises(client.TokenRevokeError) as exc_manager:
credentials._do_revoke(http, token)
# Make sure invalid was not flipped on.
self.assertFalse(credentials.invalid)
self.assertEqual(exc_manager.exception.args, (error_msg,))
if store is not None:
store.delete.assert_not_called()
revoke_uri = oauth2client.GOOGLE_REVOKE_URI + '?token=' + token
# Verify mocks.
self.assertEqual(http.requests, 1)
self.assertEqual(http.uri, revoke_uri)
self.assertEqual(http.method, 'GET')
self.assertIsNone(http.body)
self.assertIsNone(http.headers)
logger.info.assert_called_once_with('Revoking token')
def _do_retrieve_scopes_test_helper(self, response, content,
error_msg, logger, scopes=None):
credentials = client.OAuth2Credentials(
None, None, None, None, None, None, None,
token_info_uri=oauth2client.GOOGLE_TOKEN_INFO_URI)
http = http_mock.HttpMock(headers=response, data=content)
token = u's3kr3tz'
if response.status == http_client.OK:
self.assertEqual(credentials.scopes, set())
self.assertIsNone(
credentials._do_retrieve_scopes(http, token))
self.assertEqual(credentials.scopes, scopes)
else:
self.assertEqual(credentials.scopes, set())
with self.assertRaises(client.Error) as exc_manager:
credentials._do_retrieve_scopes(http, token)
# Make sure scopes were not changed.
self.assertEqual(credentials.scopes, set())
self.assertEqual(exc_manager.exception.args, (error_msg,))
token_uri = _helpers.update_query_params(
oauth2client.GOOGLE_TOKEN_INFO_URI,
{'fields': 'scope', 'access_token': token})
# Verify mocks.
self.assertEqual(http.requests, 1)
assertUrisEqual(self, token_uri, http.uri)
self.assertEqual(http.method, 'GET')
self.assertIsNone(http.body)
self.assertIsNone(http.headers)
logger.info.assert_called_once_with('Refreshing scopes')
def _create_test_credentials(self, client_id='some_client_id',
expiration=None):
access_token = 'foo'
client_secret = 'cOuDdkfjxxnv+'
refresh_token = '1/0/a.df219fjls0'
token_expiry = expiration or datetime.datetime.utcnow()
token_uri = 'https://www.google.com/accounts/o8/oauth2/token'
user_agent = 'refresh_checker/1.0'
credentials = client.OAuth2Credentials(
access_token, client_id, client_secret,
refresh_token, token_expiry, token_uri,
user_agent)
return credentials
base_repository_test.py 文件源码
项目:forseti-security
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def get_test_credential(self):
access_token = 'foo'
client_id = 'some_client_id'
client_secret = 'cOuDdkfjxxnv+'
refresh_token = '1/0/a.df219fjls0'
token_expiry = datetime.datetime.utcnow()
user_agent = ''
return client.OAuth2Credentials(
access_token, client_id, client_secret,
refresh_token, token_expiry, oauth2client.GOOGLE_TOKEN_URI,
user_agent, revoke_uri=oauth2client.GOOGLE_REVOKE_URI,
scopes='foo', token_info_uri=oauth2client.GOOGLE_TOKEN_INFO_URI)
def _create_service_api(credentials, service_name, version, developer_key=None,
cache_discovery=False):
"""Builds and returns a cloud API service object.
Args:
credentials (OAuth2Credentials): Credentials that will be used to
authenticate the API calls.
service_name (str): The name of the API.
version (str): The version of the API to use.
developer_key (str): The api key to use to determine the project
associated with the API call, most API services do not require
this to be set.
cache_discovery (bool): Whether or not to cache the discovery doc.
Returns:
object: A Resource object with methods for interacting with the service.
"""
# The default logging of the discovery obj is very noisy in recent versions.
# Lower the default logging level of just this module to WARNING unless
# debug is enabled.
if LOGGER.getEffectiveLevel() > logging.DEBUG:
logging.getLogger(discovery.__name__).setLevel(logging.WARNING)
discovery_kwargs = {
'serviceName': service_name,
'version': version,
'developerKey': developer_key,
'credentials': credentials}
if SUPPORT_DISCOVERY_CACHE:
discovery_kwargs['cache_discovery'] = cache_discovery
return discovery.build(**discovery_kwargs)
def __init__(self, credentials: client.OAuth2Credentials):
self.credentials = credentials
self.session = ClientSession()
def setUp(self):
self.client = Client(OAuth2Credentials('token', 'clientid', 'secret',
'rtoken', datetime.datetime(
2038, 1, 1), 'uri', 'ua'))
def setUp(self):
self.client = Client(OAuth2Credentials('token', 'clientid', 'secret',
'rtoken', datetime.datetime(
2038, 1, 1), 'uri', 'ua'))
self.selector = UserListSelector()