def _handle_error(url, response):
"""Handle response status codes."""
handlers = {
http_client.NOT_FOUND: NotFoundError(
'Resource not found: {}'.format(url)
),
http_client.FOUND: AlreadyExistsError(
'Resource already exists: {}'.format(url)
),
http_client.FAILED_DEPENDENCY: ValidationError(response),
http_client.UNAUTHORIZED: NotAuthorizedError(response),
http_client.BAD_REQUEST: BadRequestError(response),
}
if response.status_code in handlers:
raise handlers[response.status_code]
python类UNAUTHORIZED的实例源码
test_requests.py 文件源码
项目:google-auth-library-python
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_request_refresh(self):
credentials = mock.Mock(wraps=CredentialsStub())
final_response = make_response(status=http_client.OK)
# First request will 401, second request will succeed.
adapter = AdapterStub([
make_response(status=http_client.UNAUTHORIZED),
final_response])
authed_session = google.auth.transport.requests.AuthorizedSession(
credentials)
authed_session.mount(self.TEST_URL, adapter)
result = authed_session.request('GET', self.TEST_URL)
assert result == final_response
assert credentials.before_request.call_count == 2
assert credentials.refresh.called
assert len(adapter.requests) == 2
assert adapter.requests[0].url == self.TEST_URL
assert adapter.requests[0].headers['authorization'] == 'token'
assert adapter.requests[1].url == self.TEST_URL
assert adapter.requests[1].headers['authorization'] == 'token1'
test_urllib3.py 文件源码
项目:google-auth-library-python
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_urlopen_refresh(self):
credentials = mock.Mock(wraps=CredentialsStub())
final_response = ResponseStub(status=http_client.OK)
# First request will 401, second request will succeed.
http = HttpStub([
ResponseStub(status=http_client.UNAUTHORIZED),
final_response])
authed_http = google.auth.transport.urllib3.AuthorizedHttp(
credentials, http=http)
authed_http = authed_http.urlopen('GET', 'http://example.com')
assert authed_http == final_response
assert credentials.before_request.call_count == 2
assert credentials.refresh.called
assert http.requests == [
('GET', self.TEST_URL, None, {'authorization': 'token'}, {}),
('GET', self.TEST_URL, None, {'authorization': 'token1'}, {})]
def test_get_new_token_should_throw_ecsclientexception_401(self):
self.requests_mock.register_uri('GET', 'https://127.0.0.1:4443/login',
status_code=http_client.UNAUTHORIZED)
with super(testtools.TestCase, self).assertRaises(ECSClientException) as error:
self.token_request.get_new_token()
exception = error.exception
self.assertEqual(exception.http_status, http_client.UNAUTHORIZED)
def test_token_validation_401(self, mock_get_existing_token, mock_get_new_token):
self.requests_mock.register_uri('GET', 'https://127.0.0.1:4443/user/whoami',
status_code=http_client.UNAUTHORIZED)
mock_get_new_token.return_value = 'NEW-TOKEN-123'
mock_get_existing_token.return_value = 'EXISTING-TOKEN-123'
token = self.token_request.get_token()
self.assertEqual(token, 'NEW-TOKEN-123')
def raise_for_response(method, url, response):
"""Raise a correct error class, if needed."""
if response.status_code < http_client.BAD_REQUEST:
return
elif response.status_code == http_client.NOT_FOUND:
raise ResourceNotFoundError(method, url, response)
elif response.status_code == http_client.BAD_REQUEST:
raise BadRequestError(method, url, response)
elif response.status_code in (http_client.UNAUTHORIZED,
http_client.FORBIDDEN):
raise AccessError(method, url, response)
elif response.status_code >= http_client.INTERNAL_SERVER_ERROR:
raise ServerSideError(method, url, response)
else:
raise HTTPError(method, url, response)
def test_token_refresh_store_expires_soon(self):
# Tests the case where an access token that is valid when it is read
# from the store expires before the original request succeeds.
expiration = (datetime.datetime.utcnow() +
datetime.timedelta(minutes=15))
credentials = self._create_test_credentials(expiration=expiration)
storage = file_module.Storage(FILENAME)
storage.put(credentials)
credentials = storage.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
storage.put(new_cred)
access_token = '1/3w'
token_response = {'access_token': access_token, 'expires_in': 3600}
http = http_mock.HttpMockSequence([
({'status': http_client.UNAUTHORIZED},
b'Initial token expired'),
({'status': http_client.UNAUTHORIZED},
b'Store token expired'),
({'status': http_client.OK},
json.dumps(token_response).encode('utf-8')),
({'status': http_client.OK},
b'Valid response to original request')
])
credentials.authorize(http)
transport.request(http, 'https://example.com')
self.assertEqual(credentials.access_token, access_token)
def test_token_refresh_stream_body(self):
expiration = (datetime.datetime.utcnow() +
datetime.timedelta(minutes=15))
credentials = self._create_test_credentials(expiration=expiration)
storage = file_module.Storage(FILENAME)
storage.put(credentials)
credentials = storage.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
storage.put(new_cred)
valid_access_token = '1/3w'
token_response = {'access_token': valid_access_token,
'expires_in': 3600}
http = http_mock.HttpMockSequence([
({'status': http_client.UNAUTHORIZED},
b'Initial token expired'),
({'status': http_client.UNAUTHORIZED},
b'Store token expired'),
({'status': http_client.OK},
json.dumps(token_response).encode('utf-8')),
({'status': http_client.OK}, 'echo_request_body')
])
body = six.StringIO('streaming body')
credentials.authorize(http)
_, content = transport.request(http, 'https://example.com', body=body)
self.assertEqual(content, 'streaming body')
self.assertEqual(credentials.access_token, valid_access_token)
def _credentials_refresh(self, credentials):
http = http_mock.HttpMockSequence([
({'status': http_client.OK},
b'{"access_token":"1/3w","expires_in":3600}'),
({'status': http_client.UNAUTHORIZED}, b''),
({'status': http_client.OK},
b'{"access_token":"3/3w","expires_in":3600}'),
({'status': http_client.OK}, 'echo_request_headers'),
])
http = credentials.authorize(http)
_, content = transport.request(http, 'http://example.org')
return content
def test_token_refresh_store_expires_soon(self):
# Tests the case where an access token that is valid when it is read
# from the store expires before the original request succeeds.
expiration = (datetime.datetime.utcnow() +
datetime.timedelta(minutes=15))
credentials = self._create_test_credentials(expiration=expiration)
storage = file_module.Storage(FILENAME)
storage.put(credentials)
credentials = storage.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
storage.put(new_cred)
access_token = '1/3w'
token_response = {'access_token': access_token, 'expires_in': 3600}
http = http_mock.HttpMockSequence([
({'status': http_client.UNAUTHORIZED},
b'Initial token expired'),
({'status': http_client.UNAUTHORIZED},
b'Store token expired'),
({'status': http_client.OK},
json.dumps(token_response).encode('utf-8')),
({'status': http_client.OK},
b'Valid response to original request')
])
credentials.authorize(http)
transport.request(http, 'https://example.com')
self.assertEqual(credentials.access_token, access_token)
def test_token_refresh_stream_body(self):
expiration = (datetime.datetime.utcnow() +
datetime.timedelta(minutes=15))
credentials = self._create_test_credentials(expiration=expiration)
storage = file_module.Storage(FILENAME)
storage.put(credentials)
credentials = storage.get()
new_cred = copy.copy(credentials)
new_cred.access_token = 'bar'
storage.put(new_cred)
valid_access_token = '1/3w'
token_response = {'access_token': valid_access_token,
'expires_in': 3600}
http = http_mock.HttpMockSequence([
({'status': http_client.UNAUTHORIZED},
b'Initial token expired'),
({'status': http_client.UNAUTHORIZED},
b'Store token expired'),
({'status': http_client.OK},
json.dumps(token_response).encode('utf-8')),
({'status': http_client.OK}, 'echo_request_body')
])
body = six.StringIO('streaming body')
credentials.authorize(http)
_, content = transport.request(http, 'https://example.com', body=body)
self.assertEqual(content, 'streaming body')
self.assertEqual(credentials.access_token, valid_access_token)
def _credentials_refresh(self, credentials):
http = http_mock.HttpMockSequence([
({'status': http_client.OK},
b'{"access_token":"1/3w","expires_in":3600}'),
({'status': http_client.UNAUTHORIZED}, b''),
({'status': http_client.OK},
b'{"access_token":"3/3w","expires_in":3600}'),
({'status': http_client.OK}, 'echo_request_headers'),
])
http = credentials.authorize(http)
_, content = transport.request(http, 'http://example.org')
return content
def test_get_401(self, resp_mock):
"""Test treadmill.restclient.get UNAUTHORIZED (401)"""
resp_mock.return_value.status_code = http_client.UNAUTHORIZED
resp_mock.return_value.json.return_value = {}
with self.assertRaises(restclient.NotAuthorizedError):
restclient.get('http://foo.com', '/')
test_iam.py 文件源码
项目:google-auth-library-python
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def test_sign_bytes_failure(self):
request = make_request(http_client.UNAUTHORIZED)
credentials = make_credentials()
signer = iam.Signer(
request, credentials, mock.sentinel.service_account_email)
with pytest.raises(exceptions.TransportError):
signer.sign('123')
def test_no_user_or_user_id(self):
response = self.request.get_response(self.middleware)
self.assertEqual(response.status_int, http.UNAUTHORIZED)
def get_versions(self, header=None, ccancel=None):
"""Query the repo for versions information.
Returns a fileobject. If server returns 401 (Unauthorized)
check for presence of X-IPkg-Error header and decode."""
requesturl = self.__get_request_url("versions/0/")
fobj = self._fetch_url(requesturl, header, ccancel=ccancel,
failonerror=False)
try:
# Bogus request to trigger
# StreamingFileObj.__fill_buffer(), otherwise the
# TransportProtoError won't be raised here. We can't
# use .read() since this will empty the data buffer.
fobj.getheader("octopus", None)
except tx.TransportProtoError as e:
if e.code == http_client.UNAUTHORIZED:
exc_type, exc_value, exc_tb = sys.exc_info()
try:
e.details = self._analyze_server_error(
fobj.getheader("X-IPkg-Error",
None))
except:
# If analysis fails, raise original
# exception.
if six.PY2:
six.reraise(exc_value, None,
exc_tb)
else:
raise exc_value
raise
return fobj
def check_resp_status_and_retry(resp, image_id, url):
# Note(Jesse): This branch sorts errors into those that are permanent,
# those that are ephemeral, and those that are unexpected.
if resp.status in (httplib.BAD_REQUEST, # 400
httplib.UNAUTHORIZED, # 401
httplib.PAYMENT_REQUIRED, # 402
httplib.FORBIDDEN, # 403
httplib.METHOD_NOT_ALLOWED, # 405
httplib.NOT_ACCEPTABLE, # 406
httplib.PROXY_AUTHENTICATION_REQUIRED, # 407
httplib.CONFLICT, # 409
httplib.GONE, # 410
httplib.LENGTH_REQUIRED, # 411
httplib.PRECONDITION_FAILED, # 412
httplib.REQUEST_ENTITY_TOO_LARGE, # 413
httplib.REQUEST_URI_TOO_LONG, # 414
httplib.UNSUPPORTED_MEDIA_TYPE, # 415
httplib.REQUESTED_RANGE_NOT_SATISFIABLE, # 416
httplib.EXPECTATION_FAILED, # 417
httplib.UNPROCESSABLE_ENTITY, # 422
httplib.LOCKED, # 423
httplib.FAILED_DEPENDENCY, # 424
httplib.UPGRADE_REQUIRED, # 426
httplib.NOT_IMPLEMENTED, # 501
httplib.HTTP_VERSION_NOT_SUPPORTED, # 505
httplib.NOT_EXTENDED, # 510
):
raise PluginError("Got Permanent Error response [%i] while "
"uploading image [%s] to glance [%s]"
% (resp.status, image_id, url))
# Nova service would process the exception
elif resp.status == httplib.NOT_FOUND: # 404
exc = XenAPI.Failure('ImageNotFound')
raise exc
# NOTE(nikhil): Only a sub-set of the 500 errors are retryable. We
# optimistically retry on 500 errors below.
elif resp.status in (httplib.REQUEST_TIMEOUT, # 408
httplib.INTERNAL_SERVER_ERROR, # 500
httplib.BAD_GATEWAY, # 502
httplib.SERVICE_UNAVAILABLE, # 503
httplib.GATEWAY_TIMEOUT, # 504
httplib.INSUFFICIENT_STORAGE, # 507
):
raise RetryableError("Got Ephemeral Error response [%i] while "
"uploading image [%s] to glance [%s]"
% (resp.status, image_id, url))
else:
# Note(Jesse): Assume unexpected errors are retryable. If you are
# seeing this error message, the error should probably be added
# to either the ephemeral or permanent error list.
raise RetryableError("Got Unexpected Error response [%i] while "
"uploading image [%s] to glance [%s]"
% (resp.status, image_id, url))
test_service_account.py 文件源码
项目:deb-python-oauth2client
作者: openstack
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_authorize_401(self, utcnow):
utcnow.return_value = T1_DATE
http = http_mock.HttpMockSequence([
({'status': http_client.OK}, b''),
({'status': http_client.UNAUTHORIZED}, b''),
({'status': http_client.OK}, b''),
])
self.jwt.authorize(http)
transport.request(http, self.url)
token_1 = self.jwt.access_token
utcnow.return_value = T2_DATE
response, _ = transport.request(http, self.url)
self.assertEquals(response.status, http_client.OK)
token_2 = self.jwt.access_token
# Check the 401 forced a new token
self.assertNotEqual(token_1, token_2)
# Verify mocks.
certs = {'key': datafile('public_cert.pem')}
self.assertEqual(len(http.requests), 3)
issued_at_vals = (T1, T1, T2)
exp_vals = (T1_EXPIRY, T1_EXPIRY, T2_EXPIRY)
for info, issued_at, exp_val in zip(http.requests, issued_at_vals,
exp_vals):
self.assertEqual(info['uri'], self.url)
self.assertEqual(info['method'], 'GET')
self.assertIsNone(info['body'])
self.assertEqual(len(info['headers']), 1)
bearer, token = info['headers'][b'Authorization'].split()
self.assertEqual(bearer, b'Bearer')
# To parse the token, skip the time check, since this
# test intentionally has stale tokens.
with mock.patch('oauth2client.crypt._verify_time_range',
return_value=True):
payload = crypt.verify_signed_jwt_with_certs(
token, certs, audience=self.url)
self.assertEqual(len(payload), 5)
self.assertEqual(payload['iss'], self.service_account_email)
self.assertEqual(payload['sub'], self.service_account_email)
self.assertEqual(payload['iat'], issued_at)
self.assertEqual(payload['exp'], exp_val)
self.assertEqual(payload['aud'], self.url)
def test_authorize_401(self, utcnow):
utcnow.return_value = T1_DATE
http = http_mock.HttpMockSequence([
({'status': http_client.OK}, b''),
({'status': http_client.UNAUTHORIZED}, b''),
({'status': http_client.OK}, b''),
])
self.jwt.authorize(http)
transport.request(http, self.url)
token_1 = self.jwt.access_token
utcnow.return_value = T2_DATE
response, _ = transport.request(http, self.url)
self.assertEquals(response.status, http_client.OK)
token_2 = self.jwt.access_token
# Check the 401 forced a new token
self.assertNotEqual(token_1, token_2)
# Verify mocks.
certs = {'key': datafile('public_cert.pem')}
self.assertEqual(len(http.requests), 3)
issued_at_vals = (T1, T1, T2)
exp_vals = (T1_EXPIRY, T1_EXPIRY, T2_EXPIRY)
for info, issued_at, exp_val in zip(http.requests, issued_at_vals,
exp_vals):
self.assertEqual(info['uri'], self.url)
self.assertEqual(info['method'], 'GET')
self.assertIsNone(info['body'])
self.assertEqual(len(info['headers']), 1)
bearer, token = info['headers'][b'Authorization'].split()
self.assertEqual(bearer, b'Bearer')
# To parse the token, skip the time check, since this
# test intentionally has stale tokens.
with mock.patch('oauth2client.crypt._verify_time_range',
return_value=True):
payload = crypt.verify_signed_jwt_with_certs(
token, certs, audience=self.url)
self.assertEqual(len(payload), 5)
self.assertEqual(payload['iss'], self.service_account_email)
self.assertEqual(payload['sub'], self.service_account_email)
self.assertEqual(payload['iat'], issued_at)
self.assertEqual(payload['exp'], exp_val)
self.assertEqual(payload['aud'], self.url)