def test_authorize_401(self, utcnow):
utcnow.return_value = T1_DATE
http = http_mock.HttpMockSequence([
({'status': http_client.OK}, b''),
({'status': http_client.UNAUTHORIZED}, b''),
({'status': http_client.OK}, b''),
])
self.jwt.authorize(http)
transport.request(http, self.url)
token_1 = self.jwt.access_token
utcnow.return_value = T2_DATE
response, _ = transport.request(http, self.url)
self.assertEquals(response.status, http_client.OK)
token_2 = self.jwt.access_token
# Check the 401 forced a new token
self.assertNotEqual(token_1, token_2)
# Verify mocks.
certs = {'key': datafile('public_cert.pem')}
self.assertEqual(len(http.requests), 3)
issued_at_vals = (T1, T1, T2)
exp_vals = (T1_EXPIRY, T1_EXPIRY, T2_EXPIRY)
for info, issued_at, exp_val in zip(http.requests, issued_at_vals,
exp_vals):
self.assertEqual(info['uri'], self.url)
self.assertEqual(info['method'], 'GET')
self.assertIsNone(info['body'])
self.assertEqual(len(info['headers']), 1)
bearer, token = info['headers'][b'Authorization'].split()
self.assertEqual(bearer, b'Bearer')
# To parse the token, skip the time check, since this
# test intentionally has stale tokens.
with mock.patch('oauth2client.crypt._verify_time_range',
return_value=True):
payload = crypt.verify_signed_jwt_with_certs(
token, certs, audience=self.url)
self.assertEqual(len(payload), 5)
self.assertEqual(payload['iss'], self.service_account_email)
self.assertEqual(payload['sub'], self.service_account_email)
self.assertEqual(payload['iat'], issued_at)
self.assertEqual(payload['exp'], exp_val)
self.assertEqual(payload['aud'], self.url)
评论列表
文章目录