def dispatch(self, request, *args, **kwargs):
'''
Can only add notes to users in own gym
'''
if not request.user.is_authenticated():
return HttpResponseForbidden()
user = User.objects.get(pk=self.kwargs['user_pk'])
self.member = user
gym_id = user.userprofile.gym_id
if gym_id != request.user.userprofile.gym_id:
return HttpResponseForbidden()
return super(AddView, self).dispatch(request, *args, **kwargs)
python类HttpResponseForbidden()的实例源码
def dispatch(self, request, *args, **kwargs):
'''
Only trainers for this gym can edit user notes
'''
if not request.user.is_authenticated():
return HttpResponseForbidden()
note = self.get_object()
if note.member.userprofile.gym_id != request.user.userprofile.gym_id:
return HttpResponseForbidden()
return super(UpdateView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs):
'''
Only trainers for this gym can delete user notes
'''
if not request.user.is_authenticated():
return HttpResponseForbidden()
note = self.get_object()
if note.member.userprofile.gym_id != request.user.userprofile.gym_id:
return HttpResponseForbidden()
return super(DeleteView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs):
'''
Can only add documents to users in own gym
'''
if not request.user.is_authenticated():
return HttpResponseForbidden()
user = User.objects.get(pk=self.kwargs['user_pk'])
self.member = user
if user.userprofile.gym_id != request.user.userprofile.gym_id:
return HttpResponseForbidden()
return super(AddView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs):
'''
Only trainers for this gym can edit user notes
'''
if not request.user.is_authenticated():
return HttpResponseForbidden()
note = self.get_object()
if note.member.userprofile.gym_id != request.user.userprofile.gym_id:
return HttpResponseForbidden()
return super(UpdateView, self).dispatch(request, *args, **kwargs)
def dispatch(self, request, *args, **kwargs):
'''
Only trainers for this gym can delete documents
'''
if not request.user.is_authenticated():
return HttpResponseForbidden()
note = self.get_object()
if note.member.userprofile.gym_id != request.user.userprofile.gym_id:
return HttpResponseForbidden()
return super(DeleteView, self).dispatch(request, *args, **kwargs)
def _process_individual_form(self, form_name, form_classes):
"""
Perform the is_valid() check for a single form.
"""
forms = self.get_forms(form_classes, (form_name,))
form = forms.get(form_name)
if not form:
# there is no form with this name registered in the form_classes dictionary
return HttpResponseForbidden()
elif form.is_valid():
# the form is valid -> call the from valid method
return self.forms_valid(forms, form_name)
else:
# call the form invalid method
return self.forms_invalid(forms, form_name)
def tutorial_email(request, pk, pks):
presentation = get_object_or_404(Presentation, pk=pk)
if not request.user.is_staff:
if not is_attendee_or_speaker(request.user, presentation):
return HttpResponseForbidden(_(u"Not authorized for this page"))
pks = pks.split(",")
user_model = get_user_model()
recipients = user_model.objects.filter(pk__in=pks)
emails = recipients.values_list('email', flat=True)
from_speaker = False
if hasattr(request.user, 'speaker_profile'):
from_speaker = request.user.speaker_profile in presentation.speakers()
form = BulkEmailForm()
if request.method == 'POST':
form = BulkEmailForm(request.POST)
if form.is_valid():
subject = form.cleaned_data['subject']
body = form.cleaned_data['body']
context = email_context(
request,
presentation.proposal,
body,
subject=subject)
try:
# Send Email to each recipient separately,
send_email_message("direct_email",
from_=settings.DEFAULT_FROM_EMAIL,
to=[],
bcc=emails,
context=context,
headers={'Reply-To': request.user.email})
except SMTPException:
log.exception("ERROR sending Tutorial emails")
messages.add_message(request, messages.ERROR,
_(u"There was some error sending emails, "
u"not all of them might have made it"))
else:
messages.add_message(request, messages.INFO,
_(u"Email(s) sent"))
url = reverse(
'schedule_presentation_detail',
args=[presentation.pk]
)
return redirect(url)
ctx = {
'presentation': presentation,
'form': form,
'users': recipients,
'from_speaker': from_speaker
}
return render(request, "tutorials/email.html", ctx)
def finaid_review(request, pks=None):
"""Starting view for reviewers - list the applications"""
# On a POST the pks are in the form.
# On a GET there might be pks in the URL.
if not is_reviewer(request.user):
return HttpResponseForbidden(_(u"Not authorized for this page"))
if request.method == 'POST':
# They want to do something to bulk applicants
# Find the checkboxes they checked
regex = re.compile(r'^finaid_application_(.*)$')
pk_list = []
for field_name in request.POST:
m = regex.match(field_name)
if m:
pk_list.append(m.group(1))
if not pk_list:
messages.add_message(
request, messages.ERROR,
_(u"Please select at least one application"))
return redirect(request.path)
if 'email_action' in request.POST:
# They want to email applicants
pks = ",".join(pk_list)
return redirect('finaid_email', pks=pks)
elif 'message_action' in request.POST:
# They want to attach a message to applications
pks = ",".join(pk_list)
return redirect('finaid_message', pks=pks)
elif 'status_action' in request.POST:
# They want to change applications' statuses
applications = FinancialAidApplication.objects.filter(pk__in=pk_list)\
.select_related('review')
status = int(request.POST['status'])
count = 0
for application in applications:
try:
review = application.review
except FinancialAidReviewData.DoesNotExist:
review = FinancialAidReviewData(application=application)
if review.status != status:
review.status = status
review.save()
count += 1
messages.info(request,
"Updated %d application status%s" % (count, "" if count == 1 else "es"))
pks = ",".join(pk_list)
return redirect(reverse('finaid_review', kwargs=dict(pks=pks)))
else:
messages.error(request, "WHAT?")
else:
# GET - pks are in the URL. maybe.
pk_list = pks.split(",") if pks else []
return render(request, "finaid/application_list.html", {
"applications": FinancialAidApplication.objects.all().select_related('review'),
"status_options": STATUS_CHOICES,
"pks": [int(pk) for pk in pk_list],
})
def finaid_message(request, pks):
"""Add a message to some applications"""
if not is_reviewer(request.user):
return HttpResponseForbidden(_(u"Not authorized for this page"))
applications = FinancialAidApplication.objects.filter(pk__in=pks.split(","))\
.select_related('user')
if not applications.exists():
messages.add_message(request, messages.ERROR, _(u"No applications selected"))
return redirect('finaid_review')
if request.method == 'POST':
for application in applications:
message = FinancialAidMessage(user=request.user,
application=application)
message_form = ReviewerMessageForm(request.POST, instance=message)
if message_form.is_valid():
message = message_form.save()
# Send notice to reviewers/pycon-aid alias, and the applicant if visible
context = email_context(request, application, message)
send_email_message("reviewer/message",
# From whoever is logged in clicking the buttons
from_=request.user.email,
to=[email_address()],
context=context,
headers={'Reply-To': email_address()}
)
# If visible to applicant, notify them as well
if message.visible:
send_email_message("applicant/message",
from_=request.user.email,
to=[application.user.email],
context=context,
headers={'Reply-To': email_address()}
)
messages.add_message(request, messages.INFO, _(u"Messages sent"))
return redirect(reverse('finaid_review', kwargs=dict(pks=pks)))
else:
message_form = ReviewerMessageForm()
return render(request, "finaid/reviewer_message.html", {
'applications': applications,
'form': message_form,
})
def finaid_email(request, pks):
if not is_reviewer(request.user):
return HttpResponseForbidden(_(u"Not authorized for this page"))
applications = FinancialAidApplication.objects.filter(pk__in=pks.split(","))\
.select_related('user')
emails = [app.user.email for app in applications]
form = None
if request.method == 'POST':
form = BulkEmailForm(request.POST)
if form.is_valid():
subject = form.cleaned_data['subject']
from_email = email_address()
template_text = form.cleaned_data['template'].template
template = Template(template_text)
# emails will be the datatuple arg to send_mail_mail
emails = []
for application in applications:
try:
review = application.review
except FinancialAidReviewData.DoesNotExist:
review = None
ctx = {
'application': application,
'review': review,
}
text = template.render(Context(ctx))
emails.append((subject, text, from_email,
[application.user.email]))
try:
send_mass_mail(emails)
except SMTPException:
log.exception("ERROR sending financial aid emails")
messages.add_message(request, messages.ERROR,
_(u"There was some error sending emails, "
u"not all of them might have made it"))
else:
messages.add_message(request, messages.INFO, _(u"Emails sent"))
return redirect(reverse('finaid_review', kwargs=dict(pks=pks)))
ctx = {
'form': form or BulkEmailForm(),
'users': [app.user for app in applications]
}
return render(request, "finaid/email.html", ctx)
def get_blocks(request):
if not request.is_ajax():
return HttpResponseForbidden()
block_ids = request.GET.get('block_ids')
if not block_ids:
return JsonResponse({})
try:
cid = int(request.GET.get('cid'))
oid = int(request.GET.get('oid'))
except (TypeError, ValueError):
instance = None
else:
try:
ct = ContentType.objects.get(pk=cid)
except ContentType.DoesNotExist:
return JsonResponse({})
ct_model = ct.model_class()
try:
instance = ct_model.objects.get(pk=oid)
except ct_model.DoesNotExists:
return JsonResponse({})
result = {}
for block_id in block_ids.split(','):
try:
block_id = int(block_id)
except (TypeError, ValueError):
continue
block_ct_id = AttachableBlock.objects.filter(pk=block_id).values_list('content_type', flat=True).first()
block_model = get_model_by_ct(block_ct_id)
block = block_model.objects.get(pk=block_id)
if not block.visible:
continue
block_view = get_block_view(block)
if not block_view:
continue
result[block_id] = block_view(RequestContext(request, {
'request': request,
}), block, instance=instance)
return JsonResponse(result)
def post(self, request, *args, **kwargs):
"""
Handle a XML-RPC or JSON-RPC request.
:param request: Incoming request
:param args: Additional arguments
:param kwargs: Additional named arguments
:return: A HttpResponse containing XML-RPC or JSON-RPC response, depending on the incoming request
"""
logger.debug('RPC request received...')
for handler_cls in self.get_handler_classes():
handler = handler_cls(request, self.entry_point)
try:
if not handler.can_handle():
continue
logger.debug('Request will be handled by {}'.format(handler_cls.__name__))
result = handler.process_request()
return handler.result_success(result)
except AuthenticationFailed as e:
# Customize HttpResponse instance used when AuthenticationFailed was raised
logger.warning(e)
return handler.result_error(e, HttpResponseForbidden)
except RPCException as e:
logger.warning('RPC exception: {}'.format(e), exc_info=settings.MODERNRPC_LOG_EXCEPTIONS)
return handler.result_error(e)
except Exception as e:
logger.error('Exception raised from a RPC method: "{}"'.format(e),
exc_info=settings.MODERNRPC_LOG_EXCEPTIONS)
return handler.result_error(RPCInternalError(str(e)))
logger.error('Unable to handle incoming request.')
return HttpResponse('Unable to handle your request. Please ensure you called the right entry point. If not, '
'this could be a server error.')
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
# POST received:
# <QueryDict: {
# 'client_id': ['docker'],
# 'refresh_token': ['boink'],
# 'service': ['registry.maurusnet.test'],
# 'scope': ['repository:dev/destalinator:push,pull'],
# 'grant_type': ['refresh_token']}>
if "refresh_token" in request.POST and request.POST["grant_type"] == "refresh_token":
tr = _tkr_parse(request.POST)
if tr.scope:
tp = TokenPermissions.parse_scope(tr.scope)
else:
return HttpResponseBadRequest("Can't issue access token without valid scope (scope=%s)", tr.scope)
try:
client = DockerRegistry.objects.get(client_id=tr.service) # type: DockerRegistry
except DockerRegistry.DoesNotExist:
return HttpResponseNotFound("No such registry/client from refresh token(%s)" % str(tr))
user = self._user_from_refresh_token(request.POST["refresh_token"], client.public_key_pem(),
expected_issuer=request.get_host(),
expected_audience=tr.service)
if user:
try:
drepo = DockerRepo.objects.get(name=tp.path, registry_id=client.id)
except DockerRepo.DoesNotExist:
if settings.DOCKERAUTH_ALLOW_UNCONFIGURED_REPOS:
drepo = DockerRepo()
drepo.name = tp.path
drepo.registry = client
drepo.unauthenticated_read = True
drepo.unauthenticated_write = True
else:
return HttpResponseNotFound("No such repo '%s'" % tp.path)
if drepo.registry.has_access(user, tp) or drepo.has_access(user, tp):
rightnow = datetime.datetime.now(tz=pytz.UTC)
return HttpResponse(content=json.dumps({
"access_token": self._create_jwt(
self._make_access_token(request, tr, rightnow, tp, user),
client.private_key_pem(),
),
"scope": tr.scope,
"expires_in": 119,
"refresh_token": self._create_jwt(
self._make_refresh_token(request, tr, rightnow, user),
client.private_key_pem(),
)
}), status=200, content_type="application/json")
else:
return HttpResponseForbidden("User %s doesn't have access to repo %s" % (user.pk, tp.path))
else:
return HttpResponse("Unauthorized", status=401)
else:
return HttpResponseBadRequest("POSTing to this endpoint requires a refresh_token")