def test_callback_view(self):
self.oauth2.storage = mock.Mock()
with self.app.test_client() as client:
with mock.patch(
'oauth2client.transport.get_http_object') as new_http:
# Set-up mock.
http = http_mock.HttpMock(data=DEFAULT_RESP)
new_http.return_value = http
# Run tests.
state = self._setup_callback_state(client)
response = client.get(
'/oauth2callback?state={0}&code=codez'.format(state))
self.assertEqual(response.status_code, httplib.FOUND)
self.assertIn('/return_url', response.headers['Location'])
self.assertIn(self.oauth2.client_secret, http.body)
self.assertIn('codez', http.body)
self.assertTrue(self.oauth2.storage.put.called)
# Check the mocks were called.
new_http.assert_called_once_with()
python类FOUND的实例源码
def test_incremental_auth_exchange(self):
self._create_incremental_auth_app()
with mock.patch('oauth2client.transport.get_http_object') as new_http:
# Set-up mock.
new_http.return_value = http_mock.HttpMock(data=DEFAULT_RESP)
# Run tests.
with self.app.test_client() as client:
state = self._setup_callback_state(
client,
return_url='/return_url',
# Incremental auth scopes.
scopes=['one', 'two'])
response = client.get(
'/oauth2callback?state={0}&code=codez'.format(state))
self.assertEqual(response.status_code, httplib.FOUND)
credentials = self.oauth2.credentials
self.assertTrue(
credentials.has_scopes(['email', 'one', 'two']))
# Check the mocks were called.
new_http.assert_called_once_with()
def test_callback_view(self):
self.oauth2.storage = mock.Mock()
with self.app.test_client() as client:
with mock.patch(
'oauth2client.transport.get_http_object') as new_http:
# Set-up mock.
http = http_mock.HttpMock(data=DEFAULT_RESP)
new_http.return_value = http
# Run tests.
state = self._setup_callback_state(client)
response = client.get(
'/oauth2callback?state={0}&code=codez'.format(state))
self.assertEqual(response.status_code, httplib.FOUND)
self.assertIn('/return_url', response.headers['Location'])
self.assertIn(self.oauth2.client_secret, http.body)
self.assertIn('codez', http.body)
self.assertTrue(self.oauth2.storage.put.called)
# Check the mocks were called.
new_http.assert_called_once_with()
def test_incremental_auth_exchange(self):
self._create_incremental_auth_app()
with mock.patch('oauth2client.transport.get_http_object') as new_http:
# Set-up mock.
new_http.return_value = http_mock.HttpMock(data=DEFAULT_RESP)
# Run tests.
with self.app.test_client() as client:
state = self._setup_callback_state(
client,
return_url='/return_url',
# Incremental auth scopes.
scopes=['one', 'two'])
response = client.get(
'/oauth2callback?state={0}&code=codez'.format(state))
self.assertEqual(response.status_code, httplib.FOUND)
credentials = self.oauth2.credentials
self.assertTrue(
credentials.has_scopes(['email', 'one', 'two']))
# Check the mocks were called.
new_http.assert_called_once_with()
def test_incremental_auth_exchange(self):
self._create_incremental_auth_app()
with Http2Mock():
with self.app.test_client() as client:
state = self._setup_callback_state(
client,
return_url='/return_url',
# Incremental auth scopes.
scopes=['one', 'two'])
response = client.get(
'/oauth2callback?state={0}&code=codez'.format(state))
self.assertEqual(response.status_code, httplib.FOUND)
credentials = self.oauth2.credentials
self.assertTrue(
credentials.has_scopes(['email', 'one', 'two']))
def _handle_error(url, response):
"""Handle response status codes."""
handlers = {
http_client.NOT_FOUND: NotFoundError(
'Resource not found: {}'.format(url)
),
http_client.FOUND: AlreadyExistsError(
'Resource already exists: {}'.format(url)
),
http_client.FAILED_DEPENDENCY: ValidationError(response),
http_client.UNAUTHORIZED: NotAuthorizedError(response),
http_client.BAD_REQUEST: BadRequestError(response),
}
if response.status_code in handlers:
raise handlers[response.status_code]
def init(api, _cors, impl):
"""Configures REST handlers for allocation resource."""
namespace = webutils.namespace(
api, __name__, 'Local nodeinfo redirect API.'
)
@namespace.route('/<hostname>/<path:path>')
class _NodeRedirect(restplus.Resource):
"""Redirects to local nodeinfo endpoint."""
def get(self, hostname, path):
"""Returns list of local instances."""
hostport = impl.get(hostname)
if not hostport:
return 'Host not found.', http_client.NOT_FOUND
url = utils.encode_uri_parts(path)
return flask.redirect('http://%s/%s' % (hostport, url),
code=http_client.FOUND)
def _http_request(self, conn_url, method, **kwargs):
"""Send an http request with the specified characteristics
Wrapper around request.Session.request to handle tasks such as
setting headers and error handling.
"""
kwargs['headers'] = kwargs.get('headers', {})
kwargs['headers'].setdefault('User-agent', USER_AGENT)
self.log_curl_request(method, conn_url, kwargs)
body = kwargs.pop('body', None)
headers = kwargs.pop('headers')
conn_url = self._make_connection_url(conn_url)
try:
resp = self.session.request(method, conn_url, headers=headers,
data=body, json=kwargs)
except requests.exceptions.RequestException as e:
msg = (_("Error has occured while handling request for "
"%(url)s: %(e)s") % dict(url=conn_url, e=e))
if isinstance(e, ValueError):
raise exc.ValidationError(msg)
raise exc.ConnectionRefuse(msg)
self.log_http_response(resp, resp.text)
body_iter = six.StringIO(resp.text)
if resp.status_code >= http_client.BAD_REQUEST:
error_json = _extract_error_json(resp.text)
raise exc.from_response(resp, error_json, method, conn_url)
elif resp.status_code in (http_client.FOUND,
http_client.USE_PROXY):
return self._http_request(resp['location'], method, **kwargs)
return resp, body_iter
def test_callback_view(self):
self.oauth2.storage = mock.Mock()
with self.app.test_client() as client:
with Http2Mock() as http:
state = self._setup_callback_state(client)
response = client.get(
'/oauth2callback?state={0}&code=codez'.format(state))
self.assertEqual(response.status_code, httplib.FOUND)
self.assertIn('/return_url', response.headers['Location'])
self.assertIn(self.oauth2.client_secret, http.body)
self.assertIn('codez', http.body)
self.assertTrue(self.oauth2.storage.put.called)
def test_get_302(self, resp_mock):
"""Test treadmill.restclient.get FOUND (302)"""
resp_mock.return_value.status_code = http_client.FOUND
with self.assertRaises(restclient.AlreadyExistsError):
restclient.get('http://foo.com', '/')
def test_get_version_list_302(self, mock_get_client):
req = webob.Request.blank('/v1')
req.accept = "application/json"
res = req.get_response(self.wsgi_app)
self.assertEqual(http.FOUND, res.status_int)
redirect_req = webob.Request.blank('/v1/')
self.assertEqual(redirect_req.url, res.location)
test_federated_authentication.py 文件源码
项目:keystone-tempest-plugin
作者: openstack
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def _request_unscoped_token(self):
resp = self.saml2_client.send_service_provider_request(
self.keystone_v3_endpoint, self.idp_id, self.protocol_id)
self.assertEqual(http_client.OK, resp.status_code)
saml2_authn_request = etree.XML(resp.content)
relay_state = self._str_from_xml(
saml2_authn_request, self.ECP_RELAY_STATE)
sp_consumer_url = self._str_from_xml(
saml2_authn_request, self.ECP_SERVICE_PROVIDER_CONSUMER_URL)
# Perform the authn request to the identity provider
resp = self.saml2_client.send_identity_provider_authn_request(
saml2_authn_request, self.idp_url, self.username, self.password)
self.assertEqual(http_client.OK, resp.status_code)
saml2_idp_authn_response = etree.XML(resp.content)
idp_consumer_url = self._str_from_xml(
saml2_idp_authn_response, self.ECP_IDP_CONSUMER_URL)
# Assert that both saml2_authn_request and saml2_idp_authn_response
# have the same consumer URL.
self.assertEqual(sp_consumer_url, idp_consumer_url)
# Present the identity provider authn response to the service provider.
resp = self.saml2_client.send_service_provider_saml2_authn_response(
saml2_idp_authn_response, relay_state, idp_consumer_url)
# Must receive a redirect from service provider to the URL where the
# unscoped token can be retrieved.
self.assertIn(resp.status_code,
[http_client.FOUND, http_client.SEE_OTHER])
# We can receive multiple types of errors here, the response depends on
# the mapping and the username used to authenticate in the Identity
# Provider and also in the Identity Provider remote ID validation.
# If everything works well, we receive an unscoped token.
sp_url = resp.headers['location']
resp = (
self.saml2_client.send_service_provider_unscoped_token_request(
sp_url))
self.assertEqual(http_client.CREATED, resp.status_code)
self.assertIn('X-Subject-Token', resp.headers)
self.assertNotEmpty(resp.json())
return resp
def test_required(self):
@self.app.route('/protected')
@self.oauth2.required
def index():
return 'Hello'
# No credentials, should redirect
with self.app.test_client() as client:
response = client.get('/protected')
self.assertEqual(response.status_code, httplib.FOUND)
self.assertIn('oauth2authorize', response.headers['Location'])
self.assertIn('protected', response.headers['Location'])
credentials = self._generate_credentials(scopes=self.oauth2.scopes)
# With credentials, should allow
with self.app.test_client() as client:
with client.session_transaction() as session:
session['google_oauth2_credentials'] = credentials.to_json()
response = client.get('/protected')
self.assertEqual(response.status_code, httplib.OK)
self.assertIn('Hello', response.data.decode('utf-8'))
# Expired credentials with refresh token, should allow.
credentials.token_expiry = datetime.datetime(1990, 5, 28)
with mock.patch('oauth2client.client._UTCNOW') as utcnow:
utcnow.return_value = datetime.datetime(1990, 5, 29)
with self.app.test_client() as client:
with client.session_transaction() as session:
session['google_oauth2_credentials'] = (
credentials.to_json())
response = client.get('/protected')
self.assertEqual(response.status_code, httplib.OK)
self.assertIn('Hello', response.data.decode('utf-8'))
# Expired credentials without a refresh token, should redirect.
credentials.refresh_token = None
with mock.patch('oauth2client.client._UTCNOW') as utcnow:
utcnow.return_value = datetime.datetime(1990, 5, 29)
with self.app.test_client() as client:
with client.session_transaction() as session:
session['google_oauth2_credentials'] = (
credentials.to_json())
response = client.get('/protected')
self.assertEqual(response.status_code, httplib.FOUND)
self.assertIn('oauth2authorize', response.headers['Location'])
self.assertIn('protected', response.headers['Location'])
def test_incremental_auth(self):
self._create_incremental_auth_app()
# No credentials, should redirect
with self.app.test_client() as client:
response = client.get('/one')
self.assertIn('one', response.headers['Location'])
self.assertEqual(response.status_code, httplib.FOUND)
# Credentials for one. /one should allow, /two should redirect.
credentials = self._generate_credentials(scopes=['email', 'one'])
with self.app.test_client() as client:
with client.session_transaction() as session:
session['google_oauth2_credentials'] = credentials.to_json()
response = client.get('/one')
self.assertEqual(response.status_code, httplib.OK)
response = client.get('/two')
self.assertIn('two', response.headers['Location'])
self.assertEqual(response.status_code, httplib.FOUND)
# Starting the authorization flow should include the
# include_granted_scopes parameter as well as the scopes.
response = client.get(response.headers['Location'][17:])
q = urlparse.parse_qs(
response.headers['Location'].split('?', 1)[1])
self.assertIn('include_granted_scopes', q)
self.assertEqual(
set(q['scope'][0].split(' ')),
set(['one', 'email', 'two', 'three']))
# Actually call two() without a redirect.
credentials2 = self._generate_credentials(
scopes=['email', 'two', 'three'])
with self.app.test_client() as client:
with client.session_transaction() as session:
session['google_oauth2_credentials'] = credentials2.to_json()
response = client.get('/two')
self.assertEqual(response.status_code, httplib.OK)
def test_required(self):
@self.app.route('/protected')
@self.oauth2.required
def index():
return 'Hello'
# No credentials, should redirect
with self.app.test_client() as client:
response = client.get('/protected')
self.assertEqual(response.status_code, httplib.FOUND)
self.assertIn('oauth2authorize', response.headers['Location'])
self.assertIn('protected', response.headers['Location'])
credentials = self._generate_credentials(scopes=self.oauth2.scopes)
# With credentials, should allow
with self.app.test_client() as client:
with client.session_transaction() as session:
session['google_oauth2_credentials'] = credentials.to_json()
response = client.get('/protected')
self.assertEqual(response.status_code, httplib.OK)
self.assertIn('Hello', response.data.decode('utf-8'))
# Expired credentials with refresh token, should allow.
credentials.token_expiry = datetime.datetime(1990, 5, 28)
with mock.patch('oauth2client.client._UTCNOW') as utcnow:
utcnow.return_value = datetime.datetime(1990, 5, 29)
with self.app.test_client() as client:
with client.session_transaction() as session:
session['google_oauth2_credentials'] = (
credentials.to_json())
response = client.get('/protected')
self.assertEqual(response.status_code, httplib.OK)
self.assertIn('Hello', response.data.decode('utf-8'))
# Expired credentials without a refresh token, should redirect.
credentials.refresh_token = None
with mock.patch('oauth2client.client._UTCNOW') as utcnow:
utcnow.return_value = datetime.datetime(1990, 5, 29)
with self.app.test_client() as client:
with client.session_transaction() as session:
session['google_oauth2_credentials'] = (
credentials.to_json())
response = client.get('/protected')
self.assertEqual(response.status_code, httplib.FOUND)
self.assertIn('oauth2authorize', response.headers['Location'])
self.assertIn('protected', response.headers['Location'])
def test_incremental_auth(self):
self._create_incremental_auth_app()
# No credentials, should redirect
with self.app.test_client() as client:
response = client.get('/one')
self.assertIn('one', response.headers['Location'])
self.assertEqual(response.status_code, httplib.FOUND)
# Credentials for one. /one should allow, /two should redirect.
credentials = self._generate_credentials(scopes=['email', 'one'])
with self.app.test_client() as client:
with client.session_transaction() as session:
session['google_oauth2_credentials'] = credentials.to_json()
response = client.get('/one')
self.assertEqual(response.status_code, httplib.OK)
response = client.get('/two')
self.assertIn('two', response.headers['Location'])
self.assertEqual(response.status_code, httplib.FOUND)
# Starting the authorization flow should include the
# include_granted_scopes parameter as well as the scopes.
response = client.get(response.headers['Location'][17:])
q = urlparse.parse_qs(
response.headers['Location'].split('?', 1)[1])
self.assertIn('include_granted_scopes', q)
self.assertEqual(
set(q['scope'][0].split(' ')),
set(['one', 'email', 'two', 'three']))
# Actually call two() without a redirect.
credentials2 = self._generate_credentials(
scopes=['email', 'two', 'three'])
with self.app.test_client() as client:
with client.session_transaction() as session:
session['google_oauth2_credentials'] = credentials2.to_json()
response = client.get('/two')
self.assertEqual(response.status_code, httplib.OK)
def test_required(self):
@self.app.route('/protected')
@self.oauth2.required
def index():
return 'Hello'
# No credentials, should redirect
with self.app.test_client() as client:
response = client.get('/protected')
self.assertEqual(response.status_code, httplib.FOUND)
self.assertIn('oauth2authorize', response.headers['Location'])
self.assertIn('protected', response.headers['Location'])
credentials = self._generate_credentials(scopes=self.oauth2.scopes)
# With credentials, should allow
with self.app.test_client() as client:
with client.session_transaction() as session:
session['google_oauth2_credentials'] = credentials.to_json()
response = client.get('/protected')
self.assertEqual(response.status_code, httplib.OK)
self.assertIn('Hello', response.data.decode('utf-8'))
# Expired credentials with refresh token, should allow.
credentials.token_expiry = datetime.datetime(1990, 5, 28)
with mock.patch('oauth2client.client._UTCNOW') as utcnow:
utcnow.return_value = datetime.datetime(1990, 5, 29)
with self.app.test_client() as client:
with client.session_transaction() as session:
session['google_oauth2_credentials'] = (
credentials.to_json())
response = client.get('/protected')
self.assertEqual(response.status_code, httplib.OK)
self.assertIn('Hello', response.data.decode('utf-8'))
# Expired credentials without a refresh token, should redirect.
credentials.refresh_token = None
with mock.patch('oauth2client.client._UTCNOW') as utcnow:
utcnow.return_value = datetime.datetime(1990, 5, 29)
with self.app.test_client() as client:
with client.session_transaction() as session:
session['google_oauth2_credentials'] = (
credentials.to_json())
response = client.get('/protected')
self.assertEqual(response.status_code, httplib.FOUND)
self.assertIn('oauth2authorize', response.headers['Location'])
self.assertIn('protected', response.headers['Location'])
def test_incremental_auth(self):
self._create_incremental_auth_app()
# No credentials, should redirect
with self.app.test_client() as client:
response = client.get('/one')
self.assertIn('one', response.headers['Location'])
self.assertEqual(response.status_code, httplib.FOUND)
# Credentials for one. /one should allow, /two should redirect.
credentials = self._generate_credentials(scopes=['email', 'one'])
with self.app.test_client() as client:
with client.session_transaction() as session:
session['google_oauth2_credentials'] = credentials.to_json()
response = client.get('/one')
self.assertEqual(response.status_code, httplib.OK)
response = client.get('/two')
self.assertIn('two', response.headers['Location'])
self.assertEqual(response.status_code, httplib.FOUND)
# Starting the authorization flow should include the
# include_granted_scopes parameter as well as the scopes.
response = client.get(response.headers['Location'][17:])
q = urlparse.parse_qs(response.headers['Location'].split('?', 1)[1])
self.assertIn('include_granted_scopes', q)
self.assertEqual(
set(q['scope'][0].split(' ')),
set(['one', 'email', 'two', 'three']))
# Actually call two() without a redirect.
credentials2 = self._generate_credentials(
scopes=['email', 'two', 'three'])
with self.app.test_client() as client:
with client.session_transaction() as session:
session['google_oauth2_credentials'] = credentials2.to_json()
response = client.get('/two')
self.assertEqual(response.status_code, httplib.OK)
def _http_request(self, url, method, **kwargs):
kwargs.setdefault('user_agent', USER_AGENT)
kwargs.setdefault('auth', self.auth)
if isinstance(self.endpoint_override, six.string_types):
kwargs.setdefault(
'endpoint_override',
_trim_endpoint_api_version(self.endpoint_override)
)
if getattr(self, 'os_iotronic_api_version', None):
kwargs['headers'].setdefault('X-OpenStack-Iotronic-API-Version',
self.os_iotronic_api_version)
endpoint_filter = kwargs.setdefault('endpoint_filter', {})
endpoint_filter.setdefault('interface', self.interface)
endpoint_filter.setdefault('service_type', self.service_type)
endpoint_filter.setdefault('region_name', self.region_name)
resp = self.session.request(url, method,
raise_exc=False, **kwargs)
if resp.status_code == http_client.NOT_ACCEPTABLE:
negotiated_ver = self.negotiate_version(self.session, resp)
kwargs['headers']['X-OpenStack-Iotronic-API-Version'] = (
negotiated_ver)
return self._http_request(url, method, **kwargs)
if resp.status_code >= http_client.BAD_REQUEST:
error_json = _extract_error_json(resp.content)
# NOTE(vdrok): exceptions from iotronic controllers' _lookup
# methods
# are constructed directly by pecan instead of wsme, and contain
# only description field
raise exc.from_response(resp, (error_json.get('faultstring') or
error_json.get('description')),
error_json.get('debuginfo'), method, url)
elif resp.status_code in (http_client.MOVED_PERMANENTLY,
http_client.FOUND, http_client.USE_PROXY):
# Redirected. Reissue the request to the new location.
location = resp.headers.get('location')
resp = self._http_request(location, method, **kwargs)
elif resp.status_code == http_client.MULTIPLE_CHOICES:
raise exc.from_response(resp, method=method, url=url)
return resp