def get_object(self):
""" return the object based on pk or slug """
queryset = self.filter_queryset(self.get_queryset())
lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field
lookup_value = self.kwargs[lookup_url_kwarg]
filter_kwargs = {}
if re.match(self.lookup_pk_regexp, lookup_value):
filter_kwargs[self.lookup_field] = lookup_value
else:
filter_kwargs[self.lookup_slug_field] = lookup_value
# May raise a permission denied
obj = get_object_or_404(queryset, **filter_kwargs)
self.check_object_permissions(self.request, obj)
return obj
python类get_object_or_404()的实例源码
def has_permission(self, request, view):
if request.user.is_anonymous():
raise Http404
user = get_object_or_404(
User,
social_auth__uid=view.kwargs['username'],
social_auth__provider=EdxOrgOAuth2.name
)
# if the user is looking for their own profile, they're good
if request.user == user:
return True
# if the user is looking for someone enrolled in a program they
# are staff on, they're good
if request.user.role_set.filter(
role__in=(Staff.ROLE_ID, Instructor.ROLE_ID),
program__programenrollment__user=user,
).exists():
return True
else:
raise Http404
def put(self, request, *args, **kwargs):
data = request.data
user = request.user
try:
uuid = data['uuid']
except KeyError:
raise Http404
instance = get_object_or_404(self.model, user=user, uuid=uuid)
serializer = self.update_serializer_class(instance, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
else:
return Response('Invalid Data Submitted {}'.format(data), status=400)
return Response(serializer.data)
def _get_analytics_dataframe(cls, user, supplement_uuid):
supplement = get_object_or_404(Supplement, uuid=supplement_uuid, user=user)
supplement_series = cls._get_daily_supplement_events_series_last_year(user, supplement)
sleep_series = cls._get_sleep_series_last_year(user)
productivity_series = cls._get_productivity_series_last_year(user)
# if either sleep or productivity are empty, create an empty series that is timezone
# aware (hence, matching the supplement index)
if sleep_series.empty:
sleep_series = pd.Series(index=supplement_series.index)
if productivity_series.empty:
productivity_series = pd.Series(index=supplement_series.index)
dataframe_details = {
'supplement': supplement_series,
'sleep': sleep_series,
'productivity': productivity_series
}
dataframe = pd.DataFrame(dataframe_details)
return dataframe
def get_object(self):
"""Lookup user profile by pk or username"""
lookup = self.kwargs.get(self.lookup_field, None)
if lookup is None:
raise ParseError(
'Expected URL keyword argument `%s`.' % self.lookup_field
)
queryset = self.filter_queryset(self.get_queryset())
try:
pk = int(lookup)
except (TypeError, ValueError):
filter_kwargs = {'username': lookup}
else:
filter_kwargs = {'pk': pk}
# Return a 404 if the user does not exist
user = get_object_or_404(User, **filter_kwargs)
# Since the user does exist, create a matching profile if necessary
obj, created = queryset.get_or_create(user=user)
# May raise a permission denied
self.check_object_permissions(self.request, obj)
return obj
def get_object(self):
"""Lookup a username by pk else use lookup_field"""
queryset = self.filter_queryset(self.get_queryset())
lookup = self.kwargs.get(self.lookup_field)
filter_kwargs = {self.lookup_field: lookup}
try:
pk = int(lookup)
except ValueError:
pass
else:
filter_kwargs = {'pk': pk}
obj = get_object_or_404(queryset, **filter_kwargs)
# May raise a permission denied
self.check_object_permissions(self.request, obj)
return obj
def get_object(self):
""" That override is to get 403 instead of 404 on single object ops
Mostly a copy-paste from GenericAPIView
"""
queryset = self.filter_queryset(self.model.objects.all())
# Perform the lookup filtering.
lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field
assert lookup_url_kwarg in self.kwargs, (
'Expected view %s to be called with a URL keyword argument '
'named "%s". Fix your URL conf, or set the `.lookup_field` '
'attribute on the view correctly.' %
(self.__class__.__name__, lookup_url_kwarg))
filter_kwargs = {self.lookup_field: self.kwargs[lookup_url_kwarg]}
obj = get_object_or_404(queryset, **filter_kwargs)
# May raise a permission denied
self.check_object_permissions(self.request, obj)
return obj
def approve(self, request, uuid=None):
# We can't use `self.get_object()` because:
# 1) It filters only for active repos and this one is potentially
# not active yet
# 2) We want to always return a response even if the current user
# does not have access to this particular repo
repo = get_object_or_404(models.Repository, uuid=uuid)
# See if we're going to approve this repo
if ((repo.status == models.Repository.Status.PendingInviterApproval) and
(repo.inviter_login == request.user.username)):
repo.approve_by_inviter(request.user)
# Select which serializer class to use based on the current user's access
serializer_class = RepositoryApproveSerializer
if request.user in repo.admins.all():
serializer_class = self.get_serializer_class()
serializer = serializer_class(repo, context=self.get_serializer_context())
return Response(serializer.data)
################################################################################
# Views
################################################################################
def get(self, request, client_user_string, functionality_group_id):
"""
Is the specified user allowed to use the functionality?
Returns `{ "enabled": true }` if the specified ClientUser is allowed to use the
functionality group, ``{ "enabled": false }`` if the user is disallowed, or a 404 error if
functionality group does not exist.
Include a string that uniquely identifies the user as the client user string.
See also the `which` endpoint to see if a user has a specific functionality group within a
functionality
"""
functionality_group = get_object_or_404(Functionality, id=functionality_group_id)
client_user = ClientUser.user_from_object(client_user_string)
return Response({'enabled': can_i_use(client_user, functionality_group)})
def get(self, request, client_user_string, functionality_group_id):
"""
Which Flavor of the given Functionality is enabled for the user, if any?
Returns `{ "which": "<app.group.functionality>" }` that corresponds to the ClientUser's
enabled functionality, or `{ "which": none }` if the user does not have any Flavor in
the given FuncationlityGroup.
If the Functionality does not exist, this endpoint returns a 404 error.
"""
functionality_group = get_object_or_404(Functionality, id=functionality_group_id)
client_user = ClientUser.user_from_object(client_user_string)
availability = which(client_user, functionality_group)
return Response({
'functionality': availability.flavor.__str__() if availability else None,
})
def get_object(self):
"""
Returns the page with id == the one passed in for a particular revision.
The revision can be selected by using the query param `revision-id` which defaults to the latest one.
"""
obj = super().get_object()
revision_id = self.request.query_params.get('revision-id')
if revision_id:
revision = get_object_or_404(obj.revisions, id=revision_id)
else:
revision = obj.revisions.order_by('-created_at').first()
# in case of no revisions, return the object (edge case)
if not revision:
return obj
base = revision.as_page_object()
return base.specific
def get_object(self, queryset=None):
"""Lookup a username by pk else use lookup_field"""
if queryset is None:
queryset = self.filter_queryset(self.get_queryset())
lookup = self.kwargs.get(self.lookup_field)
filter_kwargs = {self.lookup_field: lookup}
try:
pk = int(lookup)
except ValueError:
pass
else:
filter_kwargs = {'pk': pk}
obj = get_object_or_404(queryset, **filter_kwargs)
# May raise a permission denied
self.check_object_permissions(self.request, obj)
return obj
def task_channel(self, request, task_id=None):
"""
Gets or creates task channel
---
response_serializer: ChannelSerializer
"""
task = get_object_or_404(Task.objects.all(), pk=task_id)
channel = None
if task:
channel = get_or_create_task_channel(request.user, task)
if not channel:
return Response(
{'status': "Couldn't create task channel"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
response_serializer = ChannelSerializer(channel, context={'request': request})
return Response(response_serializer.data)
def update_read(self, request, pk=None):
"""
Updates user's read_at for channel
---
request_serializer: LastReadActivitySerializer
response_serializer: ChannelSerializer
"""
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
last_read = serializer.validated_data['last_read']
channel = get_object_or_404(self.get_queryset(), pk=pk)
if channel.has_object_read_permission(request):
if request.user.is_authenticated():
ChannelUser.objects.update_or_create(
user=request.user, channel=channel, defaults={'last_read': last_read}
)
else:
channel.last_read = last_read
channel.save()
response_serializer = ChannelSerializer(channel, context={'request': request})
return Response(response_serializer.data)
return Response(
{'status': 'Unauthorized', 'message': 'No access to this channel'},
status=status.HTTP_401_UNAUTHORIZED
)
def update_read(self, request, pk=None):
"""
Set message as last_read in it's channel
---
response_serializer: ChannelSerializer
"""
message = get_object_or_404(self.get_queryset(), pk=pk)
if message.has_object_read_permission(request):
ChannelUser.objects.update_or_create(
user=request.user, channel=message.channel, defaults={'last_read': message.id}
)
response_serializer = ChannelSerializer(message.channel)
return Response(response_serializer.data)
return Response(
{'status': 'Unauthorized', 'message': 'No access to this message'},
status=status.HTTP_401_UNAUTHORIZED
)
def update_read(self, request, pk=None):
"""
Updates user's read_at for channel
---
request_serializer: LastReadActivitySerializer
response_serializer: TaskSerializer
"""
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
last_read = serializer.validated_data['last_read']
task = get_object_or_404(self.get_queryset(), pk=pk)
if task.has_object_read_permission(request):
ActivityReadLog.objects.update_or_create(
user=request.user,
content_type=ContentType.objects.get_for_model(task), object_id=task.id,
defaults={'last_read': last_read}
)
response_serializer = TaskSerializer(task, context={'request': request})
return Response(response_serializer.data)
return Response(
{'status': 'Unauthorized', 'message': 'No access to this task'},
status=status.HTTP_401_UNAUTHORIZED
)
def claim(self, request, pk=None):
"""
Claim a project
---
response_serializer: TaskSerializer
"""
task = get_object_or_404(self.get_queryset(), pk=pk)
if task.has_object_read_permission(request):
task.pm = request.user
task.save()
response_serializer = TaskSerializer(task, context={'request': request})
return Response(response_serializer.data)
return Response(
{'status': 'Unauthorized', 'message': 'No access to this task'},
status=status.HTTP_401_UNAUTHORIZED
)
def return_project(self, request, pk=None):
"""
Return a project
---
request_serializer: None
response_serializer: TaskSerializer
"""
task = get_object_or_404(self.get_queryset(), pk=pk)
if task.has_object_read_permission(request):
task.pm = None
task.save()
response_serializer = TaskSerializer(task, context={'request': request})
return Response(response_serializer.data)
return Response(
{'status': 'Unauthorized', 'message': 'No access to this task'},
status=status.HTTP_401_UNAUTHORIZED
)
def activity(self, request, pk=None):
"""
Task Activity Endpoint
---
response_serializer: SimpleActivitySerializer
omit_parameters:
- query
"""
task = get_object_or_404(self.get_queryset(), pk=pk)
self.check_object_permissions(request, task)
queryset = ActionFilter(request.GET, self.filter_queryset(task.activity_stream.all().order_by('-id')))
page = self.paginate_queryset(queryset.qs)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
def dispatch(self, request, *args, **kwargs):
self.profile = get_object_or_404(Profile, id=kwargs.get("id"))
return super().dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs):
self.profile = get_object_or_404(Profile, id=kwargs.get("id"))
return super().dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs):
self.tag = get_object_or_404(Tag, name=kwargs.get("name"))
return super().dispatch(request, *args, **kwargs)
def post(self, request, post_pk):
post_instance = get_object_or_404(Post, pk=post_pk)
post_like, post_like_created = post_instance.postlike_set.get_or_create(
user=request.user
)
if not post_like_created:
post_like.delete()
return Response({'created': post_like_created})
def me(self, request):
"""
Retrieves information of the user.
:return: Response
"""
user = get_object_or_404(self.queryset, pk=request.user.pk)
serializer = self.get_serializer(user)
return Response(serializer.data)
def destroy(self, request, pk=None):
zone = get_object_or_404(models.Zone.objects, pk=pk)
zone.soft_delete()
return Response(status=status.HTTP_204_NO_CONTENT)
def zone(self):
zone_id = self.kwargs.get('zone_id')
if zone_id is not None:
queryset = self.get_queryset()
return get_object_or_404(queryset, id=zone_id)
def list(self, request, zone_id):
zone = get_object_or_404(models.Zone, id=zone_id)
zone_data = ZoneDetailSerializer(zone, context={'request': request}).data
return Response(zone_data['records'])
def get_object(self):
zone_id = self.kwargs.get('zone_id')
if zone_id is not None:
return get_object_or_404(models.Zone, id=zone_id)
def get(self, request, username, *args, **kargs): # pylint: disable=unused-argument
"""
Returns information needed to display the user
dashboard for all the programs the user is enrolled in.
"""
user = get_object_or_404(
User,
social_auth__uid=username,
social_auth__provider=EdxOrgOAuth2.name
)
# get the credentials for the current user for edX
edx_client = None
if user == request.user:
user_social = get_social_auth(request.user)
try:
utils.refresh_user_token(user_social)
except utils.InvalidCredentialStored as exc:
return Response(
status=exc.http_status_code,
data={'error': str(exc)}
)
except: # pylint: disable=bare-except
log.exception('Impossible to refresh user credentials in dashboard view')
# create an instance of the client to query edX
edx_client = EdxApi(user_social.extra_data, settings.EDXORG_BASE_URL)
try:
program_dashboard = get_user_program_info(user, edx_client)
except utils.InvalidCredentialStored as exc:
log.exception('Access token for user %s is fresh but invalid; forcing login.', user.username)
return Response(
status=exc.http_status_code,
data={'error': str(exc)}
)
return Response(
status=status.HTTP_200_OK,
data=program_dashboard
)
def has_permission(self, request, view):
"""
Implementation of the permission class.
"""
profile = get_object_or_404(Profile, user__social_auth__uid=view.kwargs['user'])
if request.user == profile.user:
return True
# If viewer is instructor or staff in the program, skip this check
if not request.user.is_anonymous() and request.user.role_set.filter(
role__in=(Staff.ROLE_ID, Instructor.ROLE_ID),
program__programenrollment__user__profile=profile,
).exists():
return True
# private profiles
if profile.account_privacy == Profile.PRIVATE:
raise Http404
elif profile.account_privacy == Profile.PUBLIC_TO_MM:
# anonymous user accessing profiles.
if request.user.is_anonymous():
raise Http404
# requesting user must have enrollment in one of program where profile user is enroll.
program_ids = ProgramEnrollment.objects.filter(user=profile.user).values_list('program__id', flat=True)
if not ProgramEnrollment.objects.filter(user=request.user, program__id__in=program_ids).exists():
raise Http404
elif profile.account_privacy not in [Profile.PRIVATE, Profile.PUBLIC_TO_MM, Profile.PUBLIC]:
raise Http404
return True