def has_view_permissions(self, path, method, view):
"""
Return `True` if the incoming request has the correct view permissions.
"""
if view.request is None:
return True
try:
view.check_permissions(view.request)
except (exceptions.APIException, Http404, PermissionDenied):
return False
return True
python类PermissionDenied()的实例源码
def get(self, request, *args, **kwargs):
schema = self.schema_generator.get_schema(request, self.public)
if schema is None:
raise exceptions.PermissionDenied()
return Response(schema)
def exception_handler(exc, context):
"""
Returns the response that should be used for any given exception.
By default we handle the REST framework `APIException`, and also
Django's built-in `Http404` and `PermissionDenied` exceptions.
Any unhandled exceptions may return `None`, which will cause a 500 error
to be raised.
"""
if isinstance(exc, exceptions.APIException):
headers = {}
if getattr(exc, 'auth_header', None):
headers['WWW-Authenticate'] = exc.auth_header
if getattr(exc, 'wait', None):
headers['Retry-After'] = '%d' % exc.wait
if isinstance(exc.detail, (list, dict)):
data = exc.detail
else:
data = {'detail': exc.detail}
set_rollback()
return Response(data, status=exc.status_code, headers=headers)
elif isinstance(exc, Http404):
msg = _('Not found.')
data = {'detail': six.text_type(msg)}
set_rollback()
return Response(data, status=status.HTTP_404_NOT_FOUND)
elif isinstance(exc, PermissionDenied):
msg = _('Permission denied.')
data = {'detail': six.text_type(msg)}
set_rollback()
return Response(data, status=status.HTTP_403_FORBIDDEN)
return None
def permission_denied(self, request, message=None):
"""
If request is not permitted, determine what kind of exception to raise.
"""
if request.authenticators and not request.successful_authenticator:
raise exceptions.NotAuthenticated()
raise exceptions.PermissionDenied(detail=message)
def enforce_csrf(self, request):
"""
Enforce CSRF validation for session based authentication.
"""
reason = CSRFCheck().process_view(request, None, (), {})
if reason:
# CSRF failed, bail with explicit error message
raise exceptions.PermissionDenied('CSRF Failed: %s' % reason)
def check_blacklist(repo):
"""Check a Docker repository name for collision with deis/* components."""
blacklisted = [ # NOTE: keep this list up to date!
'builder', 'cache', 'controller', 'database', 'logger', 'logspout',
'publisher', 'registry', 'router', 'store-admin', 'store-daemon',
'store-gateway', 'store-metadata', 'store-monitor', 'swarm', 'mesos-master',
'mesos-marathon', 'mesos-slave', 'zookeeper',
]
if any("deis/{}".format(c) in repo for c in blacklisted):
raise PermissionDenied("Repository name {} is not allowed".format(repo))
def test_pull(self, mock_client):
self.client = DockerClient()
self.client.pull('alpine', '3.2')
docker_pull = self.client.client.pull
docker_pull.assert_called_once_with(
'alpine', tag='3.2', insecure_registry=True, stream=True)
# Test that blacklisted image names can't be pulled
with self.assertRaises(PermissionDenied):
self.client.pull('deis/controller', 'v1.11.1')
with self.assertRaises(PermissionDenied):
self.client.pull('localhost:5000/deis/controller', 'v1.11.1')
def test_tag(self, mock_client):
self.client = DockerClient()
self.client.tag('ozzy/embryo:git-f2a8020', 'ozzy/embryo', 'v4')
docker_tag = self.client.client.tag
docker_tag.assert_called_once_with(
'ozzy/embryo:git-f2a8020', 'ozzy/embryo', tag='v4', force=True)
# Test that blacklisted image names can't be tagged
with self.assertRaises(PermissionDenied):
self.client.tag('deis/controller:v1.11.1', 'deis/controller', 'v1.11.1')
with self.assertRaises(PermissionDenied):
self.client.tag('localhost:5000/deis/controller:v1.11.1', 'deis/controller', 'v1.11.1')
def update(self, request, **kwargs):
app = self.get_object()
if request.data.get('owner'):
if self.request.user != app.owner and not self.request.user.is_superuser:
raise PermissionDenied()
new_owner = get_object_or_404(User, username=request.data['owner'])
app.owner = new_owner
app.save()
return Response(status=status.HTTP_200_OK)
def create(self, request, *args, **kwargs):
app = get_object_or_404(models.App, id=request.data['receive_repo'])
request.user = get_object_or_404(User, username=request.data['receive_user'])
# check the user is authorized for this app
if not permissions.is_app_user(request, app):
raise PermissionDenied()
request.data['app'] = app
request.data['owner'] = request.user
return super(PushHookViewSet, self).create(request, *args, **kwargs)
def create(self, request, *args, **kwargs):
app = get_object_or_404(models.App, id=request.data['receive_repo'])
self.user = request.user = get_object_or_404(User, username=request.data['receive_user'])
# check the user is authorized for this app
if not permissions.is_app_user(request, app):
raise PermissionDenied()
request.data['app'] = app
request.data['owner'] = self.user
super(BuildHookViewSet, self).create(request, *args, **kwargs)
# return the application databag
response = {'release': {'version': app.release_set.latest().version},
'domains': ['.'.join([app.id, settings.DEIS_DOMAIN])]}
return Response(response, status=status.HTTP_200_OK)
def create(self, request, *args, **kwargs):
app = get_object_or_404(models.App, id=request.data['receive_repo'])
request.user = get_object_or_404(User, username=request.data['receive_user'])
# check the user is authorized for this app
if not permissions.is_app_user(request, app):
raise PermissionDenied()
config = app.release_set.latest().config
serializer = self.get_serializer(config)
return Response(serializer.data, status=status.HTTP_200_OK)
def destroy(self, request, **kwargs):
app = get_object_or_404(models.App, id=self.kwargs['id'])
user = get_object_or_404(User, username=kwargs['username'])
perm_name = "api.{}".format(self.perm)
if not user.has_perm(perm_name, app):
raise PermissionDenied()
if (user != request.user and
not permissions.IsOwnerOrAdmin.has_object_permission(permissions.IsOwnerOrAdmin(),
request, self, app)):
raise PermissionDenied()
remove_perm(self.perm, user, app)
models.log_event(app, "User {} was revoked access to {}".format(user, app))
return Response(status=status.HTTP_204_NO_CONTENT)
def check_blacklist(repo):
"""Check a Docker repository name for collision with deis/* components."""
blacklisted = [ # NOTE: keep this list up to date!
'builder', 'cache', 'controller', 'database', 'logger', 'logspout',
'publisher', 'registry', 'router', 'store-admin', 'store-daemon',
'store-gateway', 'store-metadata', 'store-monitor', 'swarm', 'mesos-master',
'mesos-marathon', 'mesos-slave', 'zookeeper',
]
if any("deis/{}".format(c) in repo for c in blacklisted):
raise PermissionDenied("Repository name {} is not allowed".format(repo))
def test_tag(self, mock_client):
self.client = DockerClient()
self.client.tag('ozzy/embryo:git-f2a8020', 'ozzy/embryo', 'v4')
docker_tag = self.client.client.tag
docker_tag.assert_called_once_with(
'ozzy/embryo:git-f2a8020', 'ozzy/embryo', tag='v4', force=True)
# Test that blacklisted image names can't be tagged
with self.assertRaises(PermissionDenied):
self.client.tag('deis/controller:v1.11.1', 'deis/controller', 'v1.11.1')
with self.assertRaises(PermissionDenied):
self.client.tag('localhost:5000/deis/controller:v1.11.1', 'deis/controller', 'v1.11.1')
def destroy(self, request, **kwargs):
calling_obj = self.get_object()
target_obj = calling_obj
if request.data.get('username'):
# if you "accidentally" target yourself, that should be fine
if calling_obj.username == request.data['username'] or calling_obj.is_superuser:
target_obj = get_object_or_404(User, username=request.data['username'])
else:
raise PermissionDenied()
target_obj.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
def update(self, request, **kwargs):
app = self.get_object()
if request.data.get('owner'):
if self.request.user != app.owner and not self.request.user.is_superuser:
raise PermissionDenied()
new_owner = get_object_or_404(User, username=request.data['owner'])
app.owner = new_owner
app.save()
return Response(status=status.HTTP_200_OK)
def create(self, request, *args, **kwargs):
app = get_object_or_404(models.App, id=request.data['receive_repo'])
request.user = get_object_or_404(User, username=request.data['receive_user'])
# check the user is authorized for this app
if not permissions.is_app_user(request, app):
raise PermissionDenied()
request.data['app'] = app
request.data['owner'] = request.user
return super(PushHookViewSet, self).create(request, *args, **kwargs)
def create(self, request, *args, **kwargs):
app = get_object_or_404(models.App, id=request.data['receive_repo'])
request.user = get_object_or_404(User, username=request.data['receive_user'])
# check the user is authorized for this app
if not permissions.is_app_user(request, app):
raise PermissionDenied()
config = app.release_set.latest().config
serializer = self.get_serializer(config)
return Response(serializer.data, status=status.HTTP_200_OK)
def create(self, request, **kwargs):
app = self.get_object()
if not permissions.IsOwnerOrAdmin.has_object_permission(permissions.IsOwnerOrAdmin(),
request, self, app):
raise PermissionDenied()
user = get_object_or_404(User, username=request.data['username'])
assign_perm(self.perm, user, app)
models.log_event(app, "User {} was granted access to {}".format(user, app))
return Response(status=status.HTTP_201_CREATED)