def authorize(action):
"""Verifies whether a policy action can be performed given the credentials
found in the falcon request context.
:param action: The policy action to enforce.
:returns: ``True`` if policy enforcement succeeded, else ``False``.
:raises: falcon.HTTPForbidden if policy enforcement failed or if the policy
action isn't registered under ``deckhand.policies``.
"""
def decorator(func):
@functools.wraps(func)
def handler(*args, **kwargs):
# args[1] is always the falcon Request object.
context = args[1].context
_do_enforce_rbac(action, context)
return func(*args, **kwargs)
return handler
return decorator
python类HTTPForbidden()的实例源码
def check_auth(ba_ctx, req):
"""Check request authentication based on boot action context.
Raise proper Falcon exception if authentication fails, otherwise
silently return
:param ba_ctx: Boot Action context from database
:param req: The falcon request object of the API call
"""
identity_key = req.get_header('X-Bootaction-Key', default='')
if identity_key == '':
raise falcon.HTTPUnauthorized(
title='Unauthorized',
description='No X-Bootaction-Key',
challenges=['Bootaction-Key'])
if ba_ctx['identity_key'] != bytes.fromhex(identity_key):
logger.warn(
"Forbidding boot action access - node: %s, identity_key: %s, req header: %s"
% (ba_ctx['node_name'], str(ba_ctx['identity_key']),
str(bytes.fromhex(identity_key))))
raise falcon.HTTPForbidden(
title='Unauthorized', description='Invalid X-Bootaction-Key')
def guarded_session():
'''
Context manager that will automatically close session on exceptions
'''
try:
session = Session()
yield session
except IrisValidationException as e:
session.close()
raise HTTPBadRequest('Validation error', str(e))
except (HTTPForbidden, HTTPUnauthorized, HTTPNotFound, HTTPBadRequest):
session.close()
raise
except Exception:
session.close()
logger.exception('SERVER ERROR')
raise
def on_post(self, req, resp):
"""Check with plex web api the user id and server to have access"""
if 'X-TOKEN' not in req.headers:
raise falcon.HTTPForbidden('Permission Denied', 'Missing header X-token')
token = req.headers['X-TOKEN']
result = plex_get_request('https://plex.tv/users/account.json', token)
if result.status_code == 401 or result.status_code == 422:
raise falcon.HTTPForbidden('Permission Denied', 'Provided token is not valid')
account_id = result.json()['user']['id']
result = plex_get_request('https://plex.tv/pms/servers', token)
media_container_element = ElementTree.fromstring(result.content)
servers = set()
for server_element in media_container_element:
servers.add(server_element.attrib['machineIdentifier'])
# The following action is idempotent, so we can run it without worrying
for server_id in servers:
server.Actions.add_account(server_id, account_id)
account.Actions.add_server(account_id, server_id)
resp.status = falcon.HTTP_200
def on_get(self, req, resp):
token = req.get_param('token', True)
data = {}
for key in self.data_keys:
data[key] = req.get_param(key, True)
if not self.validate_token(token, data):
raise falcon.HTTPForbidden('Invalid token for these given values', '')
endpoint = self.config['iris']['hook']['gmail_one_click']
try:
result = self.iclient.post(endpoint, data)
except MaxRetryError:
logger.exception('Hitting iris-api failed for gmail oneclick')
else:
if result.status == 204:
resp.status = falcon.HTTP_204
return
else:
logger.error('Unexpected status code from api %s for gmail oneclick', result.status)
raise falcon.HTTPInternalServerError('Internal Server Error', 'Invalid response from API')
test_authenticator_httpbasicauth.py 文件源码
项目:commissaire-mvp
作者: projectatomic
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_authenticate_with_invalid_password(self):
"""
Verify authenticate denies with a proper JSON file, Authorization header, and the wrong password.
"""
with mock.patch('cherrypy.engine.publish') as _publish:
return_value = mock.MagicMock(etcd.EtcdResult)
with open(self.user_config, 'r') as users_file:
return_value.value = users_file.read()
manager = mock.MagicMock(StoreHandlerManager)
_publish.return_value = [manager]
manager.get.return_value = return_value
# Reload with the data from the mock'd Etcd
http_basic_auth = httpbasicauth.HTTPBasicAuth()
req = falcon.Request(
create_environ(headers={'Authorization': 'basic YTpiCg=='}))
resp = falcon.Response()
self.assertRaises(
falcon.HTTPForbidden,
http_basic_auth.authenticate,
req, resp)
def authenticate(self, req, resp):
"""
Implements the authentication logic.
:param req: Request instance that will be passed through.
:type req: falcon.Request
:param resp: Response instance that will be passed through.
:type resp: falcon.Response
:raises: falcon.HTTPForbidden
"""
cert = req.env.get(SSL_CLIENT_VERIFY, {})
if cert:
for obj in cert.get('subject', ()):
for key, value in obj:
if key == 'commonName' and \
(not self.cn or value == self.cn):
return
# Forbid by default
raise falcon.HTTPForbidden('Forbidden', 'Forbidden')
def authenticate(self, req, resp):
"""
Implements the authentication logic.
:param req: Request instance that will be passed through.
:type req: falcon.Request
:param resp: Response instance that will be passed through.
:type resp: falcon.Response
:raises: falcon.HTTPForbidden
"""
user, passwd = self._decode_basic_auth(req)
if user is not None and passwd is not None:
if user in self._data.keys():
self.logger.debug('User {0} found in datastore.'.format(user))
if self.check_authentication(user, passwd):
return # Authentication is good
# Forbid by default
raise falcon.HTTPForbidden('Forbidden', 'Forbidden')
def test_enforce_disallowed_action(self):
action = "deckhand:list_cleartext_documents"
error_re = "Policy doesn't allow %s to be performed." % action
e = self.assertRaises(
falcon.HTTPForbidden, self._enforce_policy, action)
self.assertRegexpMatches(error_re, e.description)
def test_enforce_nonexistent_action(self):
action = "example:undefined"
error_re = "Policy %s has not been registered" % action
e = self.assertRaises(
falcon.HTTPForbidden, self._enforce_policy, action)
self.assertRegexpMatches(error_re, e.description)
def test_falcon_exception_formatting(self):
"""Verify formatting for an exception class that inherits from
:class:`falcon.HTTPError`.
"""
expected_msg = (
'deckhand:create_cleartext_documents is disallowed by policy')
with mock.patch.object(
policy, '_do_enforce_rbac',
spec_set=policy._do_enforce_rbac) as m_enforce_rbac:
m_enforce_rbac.side_effect = falcon.HTTPForbidden(
description=expected_msg)
resp = self.app.simulate_put(
'/api/v1.0/buckets/test/documents',
headers={'Content-Type': 'application/x-yaml'}, body=None)
expected = {
'status': 'Failure',
'kind': 'status',
'code': '403 Forbidden',
'apiVersion': 'v1.0',
'reason': '403 Forbidden',
'retry': False,
'details': {
'errorType': 'HTTPForbidden',
'errorCount': 1,
'messageList': [
{
'message': expected_msg,
'error': True
}
]
},
'message': expected_msg,
'metadata': {}
}
body = yaml.safe_load(resp.text)
self.assertEqual(403, resp.status_code)
self.assertEqual(expected, body)
def _do_enforce_rbac(action, context, do_raise=True):
init()
credentials = context.to_policy_values()
target = {'project_id': context.project_id,
'user_id': context.user_id}
exc = errors.PolicyNotAuthorized
try:
# `oslo.policy` supports both enforce and authorize. authorize is
# stricter because it'll raise an exception if the policy action is
# not found in the list of registered rules. This means that attempting
# to enforce anything not found in ``deckhand.policies`` will error out
# with a 'Policy not registered' message.
return _ENFORCER.authorize(
action, target, context.to_dict(), do_raise=do_raise,
exc=exc, action=action)
except policy.PolicyNotRegistered as e:
LOG.exception('Policy not registered.')
raise falcon.HTTPForbidden(description=six.text_type(e))
except Exception as e:
LOG.debug(
'Policy check for %(action)s failed with credentials '
'%(credentials)s',
{'action': action, 'credentials': credentials})
raise falcon.HTTPForbidden(description=six.text_type(e))
def __init__(self, title=None, description=None, **kwargs):
super(HTTPForbidden, self).__init__(title or self.title,
description or self.description,
**kwargs)
def on_post(self, req, resp):
raise falcon.HTTPForbidden(falcon.HTTP_403, 'Setec Astronomy')
def on_post(self, req, resp):
raise falcon.HTTPForbidden(
'Request denied',
'You do not have write permissions for this queue.',
href='http://example.com/api/rbac')
def process_request(self, req, resp):
# req.host has the port stripped from what the browser
# sent us, even when it isn't 80, which is probably a bug
# in Falcon. We deal with that in __init__ by removing
# ports from self.hosts.
if req.host not in self.hosts:
print("Attempted request with Host header '%s' denied" % req.host)
raise falcon.HTTPForbidden("Bad Host header", "Cannot connect via the provided hostname")
# the gunicorn application
def on_get(self, req, resp, app_name):
if not req.context['username']:
raise HTTPUnauthorized('You must be a logged in user to view this app\'s key')
with db.guarded_session() as session:
if not req.context['is_admin']:
has_permission = session.execute(
'''SELECT 1
FROM `application_owner`
JOIN `target` on `target`.`id` = `application_owner`.`user_id`
JOIN `application` on `application`.`id` = `application_owner`.`application_id`
WHERE `target`.`name` = :username
AND `application`.`name` = :app_name''',
{'app_name': app_name, 'username': req.context['username']}).scalar()
if not has_permission:
raise HTTPForbidden('You don\'t have permissions to view this app\'s key.')
key = session.execute(
'SELECT `key` FROM `application` WHERE `name` = :app_name LIMIT 1',
{'app_name': app_name}).scalar()
if not key:
raise HTTPBadRequest('Key for this application not found')
session.close()
resp.body = ujson.dumps({'key': key})
def validate_account_server_access(req, resp, resource, params):
if 'X-SERVER-ID' not in req.headers:
raise falcon.HTTPForbidden('Permission Denied', 'Missing header X-server-id')
server_id = req.headers['X-SERVER-ID']
account_id = params['account_id']
servers = Actions.get_servers(account_id)
try:
b = servers.index(server_id)
except ValueError:
raise falcon.HTTPForbidden('Permission Denied',
'Server ' + server_id + ' has no access to account id ' + account_id)
def process_request(self, req, resp):
# req.host has the port stripped from what the browser
# sent us, even when it isn't 80, which is probably a bug
# in Falcon. We deal with that in __init__ by removing
# ports from self.hosts.
if req.host not in self.hosts:
print("Attempted request with Host header '%s' denied" % req.host)
raise falcon.HTTPForbidden("Bad Host header", "Cannot connect via the provided hostname")
# the gunicorn application
def test_authenticator_process_request(self):
"""
Verify Authenticator's process_request calls authenticate.
"""
self.assertRaises(
falcon.HTTPForbidden,
self.authenticator.process_request,
falcon.Request(create_environ()),
falcon.Response())
test_authenticator_httpbasicauth.py 文件源码
项目:commissaire-mvp
作者: projectatomic
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def test_authenticate_with_invalid_user(self):
"""
Verify authenticate denies with a proper JSON file, Authorization header, and no matching user.
"""
self.http_basic_auth = httpbasicauth.HTTPBasicAuth(self.user_config)
req = falcon.Request(
create_environ(headers={'Authorization': 'basic Yjpi'}))
resp = falcon.Response()
self.assertRaises(
falcon.HTTPForbidden,
self.http_basic_auth.authenticate,
req, resp)
test_authenticator_httpbasicauth.py 文件源码
项目:commissaire-mvp
作者: projectatomic
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def test_authenticate_with_invalid_password(self):
"""
Verify authenticate denies with a proper JSON file, Authorization header, and the wrong password.
"""
self.http_basic_auth= httpbasicauth.HTTPBasicAuth(self.user_config)
req = falcon.Request(
create_environ(headers={'Authorization': 'basic YTpiCg=='}))
resp = falcon.Response()
self.assertRaises(
falcon.HTTPForbidden,
self.http_basic_auth.authenticate,
req, resp)
test_authenticator_httpbasicauth.py 文件源码
项目:commissaire-mvp
作者: projectatomic
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def test_authenticate_with_invalid_user(self):
"""
Verify authenticate denies with a proper JSON in Etcd, Authorization header, and no matching user.
"""
with mock.patch('cherrypy.engine.publish') as _publish:
# Mock the return of the Etcd get result
return_value = mock.MagicMock(etcd.EtcdResult)
with open(self.user_config, 'r') as users_file:
return_value.value = users_file.read()
manager = mock.MagicMock(StoreHandlerManager)
_publish.return_value = [manager]
manager.get.return_value = return_value
# Reload with the data from the mock'd Etcd
http_basic_auth = httpbasicauth.HTTPBasicAuth()
# Test the call
req = falcon.Request(
create_environ(headers={'Authorization': 'basic Yjpi'}))
resp = falcon.Response()
self.assertRaises(
falcon.HTTPForbidden,
http_basic_auth.authenticate,
req, resp)
def authenticate(self, req, resp):
"""
Implements the authentication logic.
:param req: Request instance that will be passed through.
:type req: falcon.Request
:param resp: Response instance that will be passed through.
:type resp: falcon.Response
:raises: falcon.HTTPForbidden
"""
token = self._decode_bearer_auth(req)
if token is not None:
self.logger.debug('Token found: {0}'.format(token))
try:
# NOTE: We are assuming that if the user has access to
# the resource they should be granted access to commissaire
endpoint = self._kubernetes.base_uri + self.resource_check
self.logger.debug('Checking against {0}.'.format(endpoint))
resp = requests.get(
endpoint, headers={'Authentication': 'Bearer ' + token})
self.logger.debug('Kubernetes response: {0}'.format(
resp.json()))
# If we get a 200 then the user is valid. Anything else is
# a failure
if resp.status_code == 200:
self.logger.info(
'Accepted Kubernetes token for {0}'.format(
req.remote_addr))
return
self.logger.debug('Rejecting Kubernetes token for {0}'.format(
req.remote_addr))
except Exception as error:
self.logger.warn(
'Encountered {0} while attempting to '
'authenticate. {1}'.format(type(error), error))
raise error
# Forbid by default
raise falcon.HTTPForbidden('Forbidden', 'Forbidden')
def process_request(self, req, resp):
"""
Falcon hook to inject authentication before the requests is
processed.
:param req: Request instance that will be passed through.
:type req: falcon.Request
:param resp: Response instance that will be passed through.
:type resp: falcon.Response
:raises: falcon.HTTPForbidden
"""
self.authenticate(req, resp)
def authenticate(self, req, resp):
"""
Method should be overriden with specific a specific authentication
call for an implementation.
:param req: Request instance that will be passed through.
:type req: falcon.Request
:param resp: Response instance that will be passed through.
:type resp: falcon.Response
:raises: falcon.HTTPForbidden
"""
raise falcon.HTTPForbidden('Forbidden', 'Forbidden')
def do_log_history(req, resp):
for item in advance_path:
path = item.keys()[0]
if not re.search(path, req.path):
continue
else:
user_group = req.get_header('LOGIN-USER')
groups = item.values()[0]
if user_group not in groups:
raise falcon.HTTPForbidden(
title='You not allow access it',
description='Please connect the manager')
request_id = req.get_header(name='REQUEST-ID')
if request_id is None:
return
request_id = request_id[-20:]
paths = filter(lambda x: x != '', req.path.split('/'))
if len(paths) >= 2:
ctrl_name = paths[0]
func_name = paths[1]
elif len(paths) == 1:
func_name = paths[0]
create_timestamp = datetime.now()
OperatorLog.insert(
request=request_id,
controller=ctrl_name,
exec_path=req.get_header(name='EXEC-PATH'),
func=func_name,
login_gid=req.get_header(name='LOGIN-GID'),
login_uid=req.get_header(name='LOGIN-UID'),
login_user=req.get_header(name='LOGIN-USER'),
post_data=json.dumps(req._params),
server_ip=req.get_header(name='SERVER-IP'),
create_timestamp=create_timestamp).execute()
#????