def messages(request, room_id):
if request.method == 'POST':
try:
room = ChatRoom.objects.get(eid=room_id)
except ChatRoom.DoesNotExist:
try:
room = ChatRoom(eid=room_id)
room.save()
except IntegrityError:
# someone else made the room. no problem
room = ChatRoom.objects.get(eid=room_id)
mfrom = request.POST['from']
text = request.POST['text']
with transaction.atomic():
msg = ChatMessage(room=room, user=mfrom, text=text)
msg.save()
send_event('room-%s' % room_id, 'message', msg.to_data())
body = json.dumps(msg.to_data(), cls=DjangoJSONEncoder)
return HttpResponse(body, content_type='application/json')
else:
return HttpResponseNotAllowed(['POST'])
python类HttpResponseNotAllowed()的实例源码
def InvoiceEmailTemplate(request, spk):
if request.method != "GET":
return HttpResponseNotAllowed(["GET"])
res = {}
res["status"] = "unknown"
try:
sponsoring = Sponsoring.objects.select_related("invoice").get(pk=spk)
except Sponsoring.DoesNotExist:
res["status"] = "error"
res["errmsg"] = "Cannot find sponsoring for given id"
return respond_json(res)
templateName = "invoice/mail/invoiceMailDE.html" if sponsoring.contact.contactPersonLanguage.startswith("de") else "invoice/mail/invoiceMailEN.html"
ctx_dict = { "sponsoring" : sponsoring, "user" : request.user }
message = render_to_string(templateName, ctx_dict)
res["status"] = "success"
res["success"] = True
res["company"] = sponsoring.contact.companyName
res["message"] = message
return respond_json(res)
def _generateMountURLs(self, path, mapping, app=None):
p = path
@csrf_exempt # dispatcher (view) needs to be csrf exempted
def dispatcher(req, *args, **kwargs):
service = mapping.get(req.method, None) or mapping.get('*', None)
if service:
return service['src'](req, *args, **kwargs)
else:
return HttpResponseNotAllowed(mapping.keys())
# relative path
if not p.startswith('/'):
if app:
p = '/'.join([app, p]) # add app name prefix in addition to 'path'
# absolute path
else:
p = p[1:] # remove leading '/'
# support reverse() in template for <a href=...> and <form action=...>
reversable = mapping.get('*', None) or mapping.get('GET', None) or mapping.get('POST', None)
return url(r'^{}$'.format(p), dispatcher, name='.'.join([reversable['src'].__module__, reversable['name']]) if reversable else None)
def require_http_methods(request_method_list):
"""
Decorator to make a view only accept particular request methods. Usage::
@require_http_methods(["GET", "POST"])
def my_view(request):
# I can assume now that only GET or POST requests make it this far
# ...
Note that request methods should be in uppercase.
"""
def decorator(func):
@wraps(func, assigned=available_attrs(func))
def inner(request, *args, **kwargs):
if request.method not in request_method_list:
logger.warning(
'Method Not Allowed (%s): %s', request.method, request.path,
extra={'status_code': 405, 'request': request}
)
return HttpResponseNotAllowed(request_method_list)
return func(request, *args, **kwargs)
return inner
return decorator
def glossurl(request, glossurl):
if request.method == 'POST':
glossurl = get_object_or_404(GlossURL, id=glossurl)
if 'view_dataset' not in get_perms(request.user, glossurl.gloss.dataset):
# If user has no permissions to dataset, raise PermissionDenied to show 403 template.
msg = _("You do not have permissions to add tags to glosses of this lexicon.")
messages.error(request, msg)
raise PermissionDenied(msg)
glossurl_id = glossurl.id
try:
glossurl.delete()
except PermissionDenied:
return HttpResponseForbidden('Permission Denied: Unable to delete GlossURL(id): ' + str(glossurl.id),
content_type='text/plain')
return HttpResponse('Deleted GlossURL(id): ' + str(glossurl_id), content_type='text/plain')
else:
return HttpResponseNotAllowed(permitted_methods=['POST'])
def read(request, resource_type, id, via_oauth=False, *args, **kwargs):
"""
Read from Remote FHIR Server
# Example client use in curl:
# curl -X GET http://127.0.0.1:8000/fhir/Practitioner/1234
"""
if request.method != 'GET':
return HttpResponseNotAllowed(['GET'])
interaction_type = 'read'
read_fhir = generic_read(request,
interaction_type,
resource_type,
id,
via_oauth,
*args,
**kwargs)
return read_fhir
def entrypoint(cls, request, *args, **kwargs):
# select proper action by http method
for action_class in cls.iter_actions():
if action_class.method == request.method:
break
else:
raise HttpResponseNotAllowed()
param_values = {}
for param_name, param_class in action_class.get_params().iteritems():
param_values[param_name] = param_class.construct(cls, request, args, kwargs)
try:
action = action_class(param_values)
return action.run()
except NodeParamError:
return HttpRequest(status=500) # TODO: report, logging etc
def entrypoint(cls, request, *args, **kwargs):
# select proper action by http method
for action_class in cls.iter_actions():
if action_class.method == request.method:
break
else:
raise HttpResponseNotAllowed()
param_values = {}
for param_name, param_class in action_class.get_params().iteritems():
param_values[param_name] = param_class.construct(cls, request, args, kwargs)
try:
action = action_class(param_values)
return action.run()
except NodeParamError:
return HttpRequest(status=500) # TODO: report, logging etc
def entrypoint(cls, request, *args, **kwargs):
# select proper action by http method
for action_class in cls.iter_actions():
if action_class.method == request.method:
break
else:
raise HttpResponseNotAllowed()
param_values = {}
for param_name, param_class in action_class.get_params().iteritems():
param_values[param_name] = param_class.construct(cls, request, args, kwargs)
try:
action = action_class(param_values)
return action.run()
except NodeParamError:
return HttpRequest(status=500) # TODO: report, logging etc
def require_http_methods(request_method_list):
"""
Decorator to make a view only accept particular request methods. Usage::
@require_http_methods(["GET", "POST"])
def my_view(request):
# I can assume now that only GET or POST requests make it this far
# ...
Note that request methods should be in uppercase.
"""
def decorator(func):
@wraps(func, assigned=available_attrs(func))
def inner(request, *args, **kwargs):
if request.method not in request_method_list:
logger.warning('Method Not Allowed (%s): %s', request.method, request.path,
extra={
'status_code': 405,
'request': request
}
)
return HttpResponseNotAllowed(request_method_list)
return func(request, *args, **kwargs)
return inner
return decorator
def require_http_methods(request_method_list):
"""
Decorator to make a view only accept particular request methods. Usage::
@require_http_methods(["GET", "POST"])
def my_view(request):
# I can assume now that only GET or POST requests make it this far
# ...
Note that request methods should be in uppercase.
"""
def decorator(func):
@wraps(func, assigned=available_attrs(func))
def inner(request, *args, **kwargs):
if request.method not in request_method_list:
logger.warning('Method Not Allowed (%s): %s', request.method, request.path,
extra={
'status_code': 405,
'request': request
}
)
return HttpResponseNotAllowed(request_method_list)
return func(request, *args, **kwargs)
return inner
return decorator
def CommodityEdit(request, pk):
# ????
commodity = get_object_or_404(Commodity, pk=pk)
if commodity.user != request.user:
return HttpResponseNotAllowed(['GET', 'POST'])
if(request.method == 'POST'):
if request.POST.get('commodityToggle'):
commodity.available = not commodity.available
commodity.save()
return HttpResponseRedirect(request.POST.get('next'))
form = CommodityForm(request.POST, request.FILES, instance=commodity)
if form.is_valid():
form.save()
return HttpResponseRedirect(reverse('market:commodity_view', kwargs={'pk': pk}))
else:
form = CommodityForm(instance = commodity)
return render(request, 'market/commodity_add_or_edit.html', {
'form': form,
'action': '????',
});
def http_method_not_allowed(self, request, *args, **kwargs):
logger.warning('Method Not Allowed (%s): %s', request.method, request.path,
extra={
'status_code': 405,
'request': request
}
)
return http.HttpResponseNotAllowed(self._allowed_methods())
def require_http_methods(request_method_list):
"""
Decorator to make a view only accept particular request methods. Usage::
@require_http_methods(["GET", "POST"])
def my_view(request):
# I can assume now that only GET or POST requests make it this far
# ...
Note that request methods should be in uppercase.
"""
def decorator(func):
@wraps(func, assigned=available_attrs(func))
def inner(request, *args, **kwargs):
if request.method not in request_method_list:
logger.warning('Method Not Allowed (%s): %s', request.method, request.path,
extra={
'status_code': 405,
'request': request
}
)
return HttpResponseNotAllowed(request_method_list)
return func(request, *args, **kwargs)
return inner
return decorator
def result_notification_prepare(request, section_slug, status):
if request.method != "POST":
return HttpResponseNotAllowed(["POST"])
if not request.user.has_perm("reviews.can_manage_%s" % section_slug):
return access_not_permitted(request)
proposal_pks = []
try:
for pk in request.POST.getlist("_selected_action"):
proposal_pks.append(int(pk))
except ValueError:
return HttpResponseBadRequest()
proposals = ProposalBase.objects.filter(
kind__section__slug=section_slug,
result__status=status,
)
proposals = proposals.filter(pk__in=proposal_pks)
proposals = proposals.select_related("speaker__user", "result")
proposals = proposals.select_subclasses()
notification_template_pk = request.POST.get("notification_template", "")
if notification_template_pk:
notification_template = NotificationTemplate.objects.get(pk=notification_template_pk)
else:
notification_template = None
ctx = {
"section_slug": section_slug,
"status": status,
"notification_template": notification_template,
"proposals": proposals,
"proposal_pks": ",".join([str(pk) for pk in proposal_pks]),
}
return render(request, "reviews/result_notification_prepare.html", ctx)
def team_promote(request, pk):
if request.method != "POST":
return HttpResponseNotAllowed(["POST"])
membership = get_object_or_404(Membership, pk=pk)
state = membership.team.get_state_for_user(request.user)
if request.user.is_staff or state == "manager":
if membership.state == "member":
membership.state = "manager"
membership.save()
messages.success(request, "Promoted to manager.")
return redirect("team_detail", slug=membership.team.slug)
def team_demote(request, pk):
if request.method != "POST":
return HttpResponseNotAllowed(["POST"])
membership = get_object_or_404(Membership, pk=pk)
state = membership.team.get_state_for_user(request.user)
if request.user.is_staff or state == "manager":
if membership.state == "manager":
membership.state = "member"
membership.save()
messages.success(request, "Demoted from manager.")
return redirect("team_detail", slug=membership.team.slug)
def team_accept(request, pk):
if request.method != "POST":
return HttpResponseNotAllowed(["POST"])
membership = get_object_or_404(Membership, pk=pk)
state = membership.team.get_state_for_user(request.user)
if request.user.is_staff or state == "manager":
if membership.state == "applied":
membership.state = "member"
membership.save()
messages.success(request, "Accepted application.")
return redirect("team_detail", slug=membership.team.slug)
def http_method_not_allowed(self, request, *args, **kwargs):
if settings.DEBUG and self._allowed_methods():
raise Exception("Only" + str(self._allowed_methods()))
return http.HttpResponseNotAllowed(self._allowed_methods())
def post(self, request, *args, **kwargs):
if request.POST.get('thumber_token', None) is not None:
pk = request.POST.get('id', None)
if pk is None or pk == '':
# No PK, this means we need to create a new Feedback object
http_referer = self.request.META.get('HTTP_REFERER')
sessionid = self.request.COOKIES[settings.SESSION_COOKIE_NAME]
user_feedback = ThumberForm(data=request.POST).save(commit=False)
user_feedback.url = http_referer
user_feedback.view_name = self._get_view_from_url(http_referer)
user_feedback.session = sessionid
user_feedback.view_args = (request.resolver_match.args, request.resolver_match.kwargs)
else:
# PK given, so this Feedback already exists and just needs the comment adding
user_feedback = Feedback.objects.get(pk=pk)
user_feedback.comment = request.POST['comment']
user_feedback.save()
if request.POST.get('thumber_token', None) == 'sync':
# Non-AJAX post, we've now done the processing, so return super's GET response
return self.get(request)
else:
# AJAX submission, inform frontend the frontend the POST was successful, and give the id back so it
# can be updated in a separate request
return JsonResponse({"success": True, "id": user_feedback.id})
else:
try:
return super().post(request, *args, **kwargs)
except AttributeError:
methods = [m.upper() for m in self.http_method_names if hasattr(self, m) and m.upper() != 'POST']
return HttpResponseNotAllowed(methods)
def comment(request):
"""
??
:param request:
:return:
"""
if request.method == 'POST':
nick_name = request.POST.get('nick_name')
email = request.POST.get('email')
home_url = request.POST.get('home_url')
content = request.POST.get('content')
verify_code = request.POST.get('verify_code')
post_id = request.POST.get('post_id')
comment_ip = str(request.META.get('REMOTE_ADDR'))
comment_source = request.META.get('HTTP_USER_AGENT')
if comment_source:
# ??????????????
comment_source = comment_source[0: comment_source.index(')') + 1]
correct_code = request.session.get('CheckCode', None)
if correct_code and verify_code.upper() == correct_code.upper():
comment_db = Comment()
comment_db.nick_name = nick_name
comment_db.email = email
comment_db.home_url = home_url
comment_db.content = content
comment_db.post_id = post_id
comment_db.comment_ip = comment_ip
comment_db.comment_source = comment_source
comment_db.save()
result = {'result': '???????????'}
return JsonResponse(result)
else:
result = {'result': '???????????!'}
return JsonResponse(result)
else:
return HttpResponseNotAllowed('POST')
def like_presentation(request, pk):
if request.method == 'PUT':
presentation = Presentation.objects.get(pk=pk)
is_like = presentation.like_toggle(request.user)
return HttpResponse(status=200, content=is_like)
return HttpResponseNotAllowed(["PUT"])
def reregister(request):
if request.method != 'POST':
return HttpResponseNotAllowed(['POST'])
fqdn = MessageView.valid_fqdn(request)
if not fqdn:
return HttpForbidden()
host_attributes = json.loads(request.body)
ValidatedClientView.valid_certs[request.META['HTTP_X_SSL_CLIENT_SERIAL']] = host_attributes['fqdn']
ManagedHost.objects.filter(fqdn=fqdn).update(fqdn=host_attributes['fqdn'], address=host_attributes['address'])
return HttpResponse()
def process_response(self, request, response):
if isinstance(response, HttpResponseNotAllowed):
context = RequestContext(request)
response.content = loader.render_to_string(
"405.html", context_instance=context)
return response
def unsubscribe(request, identifier):
is_test_mail = identifier.startswith('preview')
if is_test_mail:
try:
mail = PreviewMail.objects.get(identifier=identifier)
except PreviewMail.DoesNotExist:
return HttpResponseNotFound()
else:
try:
mail = get_mail_by_identifier(identifier)
except ObjectDoesNotExist:
return HttpResponseNotFound()
context = {'mail': mail, 'is_test_mail': is_test_mail}
if identifier.startswith('c-') and mail.message.external_optout:
if request.method == 'GET':
return render(
request, 'optouts/unsubscribe_external.html', context)
else:
return HttpResponseNotAllowed(['GET'])
else:
if request.method == 'POST':
optout_kwargs = {
'category': mail.get_category(), 'author': mail.get_author()}
if not is_test_mail:
OptOut.objects.create_or_update(
identifier=mail.identifier,
address=mail.recipient,
origin=OptOut.BY_WEB,
**optout_kwargs)
return HttpResponseRedirect(
reverse('unsubscribed', kwargs={'identifier': identifier}))
else:
return render(request, 'optouts/unsubscribe.html', context)
def tracking_open(request, identifier):
if request.method == 'GET':
mail = get_mail_by_identifier(identifier, must_raise=False)
if mail:
create_track_record.apply_async(kwargs={
'identifier': identifier,
'kind': 'read', 'properties': {'source': 'pixel'}})
return HttpResponse(TRACKER_PIXEL, content_type='image/gif')
return HttpResponseNotAllowed(permitted_methods=['GET'])
def tracking_redirect(request, identifier, link_identifier):
if request.method == 'GET':
return do_track_and_redirect(identifier, link_identifier)
return HttpResponseNotAllowed(permitted_methods=['GET'])
def web_tracking_redirect(request, web_key, link_identifier):
if request.method == 'GET':
identifier = WebKey(web_key).get_identifier()
return do_track_and_redirect(identifier, link_identifier)
return HttpResponseNotAllowed(permitted_methods=['GET'])
def create_transcode(request, video_id):
if request.method != 'POST':
return HttpResponseNotAllowed(['POST'])
video = get_object_or_404(Video, id=video_id)
transcode_form = VideoTranscodeAdminForm(data=request.POST, video=video)
if transcode_form.is_valid():
transcode_form.save()
return redirect('wagtailvideos:edit', video_id)
def http_method_not_allowed(self, request, *args, **kwargs):
logger.warning(
'Method Not Allowed (%s): %s', request.method, request.path,
extra={'status_code': 405, 'request': request}
)
return http.HttpResponseNotAllowed(self._allowed_methods())