def get_access_token(self, http=None, additional_claims=None):
"""Create a signed jwt.
Args:
http: unused
additional_claims: dict, additional claims to add to
the payload of the JWT.
Returns:
An AccessTokenInfo with the signed jwt
"""
if additional_claims is None:
if self.access_token is None or self.access_token_expired:
self.refresh(None)
return client.AccessTokenInfo(
access_token=self.access_token, expires_in=self._expires_in())
else:
# Create a 1 time token
token, unused_expiry = self._create_token(additional_claims)
return client.AccessTokenInfo(
access_token=token, expires_in=self._MAX_TOKEN_LIFETIME_SECS)
python类AccessTokenInfo()的实例源码
def test_get_access_token_without_http(self, expires_in, refresh_mock):
credentials = client.OAuth2Credentials(None, None, None, None,
None, None, None)
# Make sure access_token_expired returns True
credentials.invalid = True
# Specify a token so we can use it in the response.
credentials.access_token = 'ya29-s3kr3t'
with mock.patch('oauth2client.transport.get_http_object',
return_value=object()) as new_http:
token_info = credentials.get_access_token()
expires_in.assert_called_once_with()
refresh_mock.assert_called_once_with(new_http.return_value)
new_http.assert_called_once_with()
self.assertIsInstance(token_info, client.AccessTokenInfo)
self.assertEqual(token_info.access_token,
credentials.access_token)
self.assertEqual(token_info.expires_in,
expires_in.return_value)
def test_get_access_token_with_http(self, expires_in, refresh_mock):
credentials = client.OAuth2Credentials(None, None, None, None,
None, None, None)
# Make sure access_token_expired returns True
credentials.invalid = True
# Specify a token so we can use it in the response.
credentials.access_token = 'ya29-s3kr3t'
http_obj = object()
token_info = credentials.get_access_token(http_obj)
self.assertIsInstance(token_info, client.AccessTokenInfo)
self.assertEqual(token_info.access_token,
credentials.access_token)
self.assertEqual(token_info.expires_in,
expires_in.return_value)
expires_in.assert_called_once_with()
refresh_mock.assert_called_once_with(http_obj)
def get_access_token(self, http=None, additional_claims=None):
"""Create a signed jwt.
Args:
http: unused
additional_claims: dict, additional claims to add to
the payload of the JWT.
Returns:
An AccessTokenInfo with the signed jwt
"""
if additional_claims is None:
if self.access_token is None or self.access_token_expired:
self.refresh(None)
return client.AccessTokenInfo(
access_token=self.access_token, expires_in=self._expires_in())
else:
# Create a 1 time token
token, unused_expiry = self._create_token(additional_claims)
return client.AccessTokenInfo(
access_token=token, expires_in=self._MAX_TOKEN_LIFETIME_SECS)
def test_get_access_token_without_http(self, expires_in, refresh_mock):
credentials = client.OAuth2Credentials(None, None, None, None,
None, None, None)
# Make sure access_token_expired returns True
credentials.invalid = True
# Specify a token so we can use it in the response.
credentials.access_token = 'ya29-s3kr3t'
with mock.patch('oauth2client.transport.get_http_object',
return_value=object()) as new_http:
token_info = credentials.get_access_token()
expires_in.assert_called_once_with()
refresh_mock.assert_called_once_with(new_http.return_value)
new_http.assert_called_once_with()
self.assertIsInstance(token_info, client.AccessTokenInfo)
self.assertEqual(token_info.access_token,
credentials.access_token)
self.assertEqual(token_info.expires_in,
expires_in.return_value)
def test_get_access_token_with_http(self, expires_in, refresh_mock):
credentials = client.OAuth2Credentials(None, None, None, None,
None, None, None)
# Make sure access_token_expired returns True
credentials.invalid = True
# Specify a token so we can use it in the response.
credentials.access_token = 'ya29-s3kr3t'
http_obj = object()
token_info = credentials.get_access_token(http_obj)
self.assertIsInstance(token_info, client.AccessTokenInfo)
self.assertEqual(token_info.access_token,
credentials.access_token)
self.assertEqual(token_info.expires_in,
expires_in.return_value)
expires_in.assert_called_once_with()
refresh_mock.assert_called_once_with(http_obj)
def get_access_token(self, http=None, additional_claims=None):
"""Create a signed jwt.
Args:
http: unused
additional_claims: dict, additional claims to add to
the payload of the JWT.
Returns:
An AccessTokenInfo with the signed jwt
"""
if additional_claims is None:
if self.access_token is None or self.access_token_expired:
self.refresh(None)
return AccessTokenInfo(access_token=self.access_token,
expires_in=self._expires_in())
else:
# Create a 1 time token
token, unused_expiry = self._create_token(additional_claims)
return AccessTokenInfo(access_token=token,
expires_in=self._MAX_TOKEN_LIFETIME_SECS)
def get_access_token(self, http=None, additional_claims=None):
"""Create a signed jwt.
Args:
http: unused
additional_claims: dict, additional claims to add to
the payload of the JWT.
Returns:
An AccessTokenInfo with the signed jwt
"""
if additional_claims is None:
if self.access_token is None or self.access_token_expired:
self.refresh(None)
return AccessTokenInfo(access_token=self.access_token,
expires_in=self._expires_in())
else:
# Create a 1 time token
token, unused_expiry = self._create_token(additional_claims)
return AccessTokenInfo(access_token=token,
expires_in=self._MAX_TOKEN_LIFETIME_SECS)
def get_access_token(self, http=None, additional_claims=None):
"""Create a signed jwt.
Args:
http: unused
additional_claims: dict, additional claims to add to
the payload of the JWT.
Returns:
An AccessTokenInfo with the signed jwt
"""
if additional_claims is None:
if self.access_token is None or self.access_token_expired:
self.refresh(None)
return AccessTokenInfo(access_token=self.access_token,
expires_in=self._expires_in())
else:
# Create a 1 time token
token, unused_expiry = self._create_token(additional_claims)
return AccessTokenInfo(access_token=token,
expires_in=self._MAX_TOKEN_LIFETIME_SECS)
def get_access_token(self, http=None, additional_claims=None):
"""Create a signed jwt.
Args:
http: unused
additional_claims: dict, additional claims to add to
the payload of the JWT.
Returns:
An AccessTokenInfo with the signed jwt
"""
if additional_claims is None:
if self.access_token is None or self.access_token_expired:
self.refresh(None)
return client.AccessTokenInfo(
access_token=self.access_token, expires_in=self._expires_in())
else:
# Create a 1 time token
token, unused_expiry = self._create_token(additional_claims)
return client.AccessTokenInfo(
access_token=token, expires_in=self._MAX_TOKEN_LIFETIME_SECS)
def get_access_token(self, http=None, additional_claims=None):
"""Create a signed jwt.
Args:
http: unused
additional_claims: dict, additional claims to add to
the payload of the JWT.
Returns:
An AccessTokenInfo with the signed jwt
"""
if additional_claims is None:
if self.access_token is None or self.access_token_expired:
self.refresh(None)
return client.AccessTokenInfo(
access_token=self.access_token, expires_in=self._expires_in())
else:
# Create a 1 time token
token, unused_expiry = self._create_token(additional_claims)
return client.AccessTokenInfo(
access_token=token, expires_in=self._MAX_TOKEN_LIFETIME_SECS)
test_service_config.py 文件源码
项目:endpoints-management-python
作者: cloudendpoints
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def _set_up_default_credential(self):
default_credential = mock.MagicMock()
ServiceConfigFetchTest._credentials.get_application_default.return_value \
= default_credential
default_credential.create_scoped.return_value = default_credential
token = ServiceConfigFetchTest._ACCESS_TOKEN
access_token = client.AccessTokenInfo(access_token=token, expires_in=None)
default_credential.get_access_token.return_value = access_token