def _akismet_check(self, comment, content_object, request):
"""
Connects to Akismet and returns True if Akismet marks this comment as
spam. Otherwise returns False.
"""
# Get Akismet data
AKISMET_API_KEY = appsettings.AKISMET_API_KEY
if not AKISMET_API_KEY:
raise ImproperlyConfigured('You must set AKISMET_API_KEY to use comment moderation with Akismet.')
current_domain = get_current_site(request).domain
auto_blog_url = '{0}://{1}/'.format(request.is_secure() and 'https' or 'http', current_domain)
blog_url = appsettings.AKISMET_BLOG_URL or auto_blog_url
akismet = Akismet(
AKISMET_API_KEY,
blog=blog_url,
is_test=int(bool(appsettings.AKISMET_IS_TEST)),
application_user_agent='django-fluent-comments/{0}'.format(fluent_comments.__version__),
)
akismet_data = self._get_akismet_data(blog_url, comment, content_object, request)
return akismet.check(**akismet_data) # raises AkismetServerError when key is invalid
python类get_current_site()的实例源码
def send_email(self, email):
protocol = getattr(settings, "DEFAULT_HTTP_PROTOCOL", "http")
current_site = get_current_site(self.request)
for user in User.objects.filter(email__iexact=email):
uid = int_to_base36(user.id)
token = self.make_token(user)
password_reset_url = u"%s://%s%s" % (
protocol,
unicode(current_site.domain),
reverse("account_password_reset_token", kwargs=dict(uidb36=uid, token=token))
)
ctx = {
"user": user,
"current_site": current_site,
"password_reset_url": password_reset_url,
}
subject = render_to_string("account/email/password_reset_subject.txt", ctx)
subject = "".join(subject.splitlines())
message = render_to_string("account/email/password_reset.txt", ctx)
send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, [user.email])
def test_shortcut_view(self):
"""
Check that the shortcut view (used for the admin "view on site"
functionality) returns a complete URL regardless of whether the sites
framework is installed
"""
request = HttpRequest()
request.META = {
"SERVER_NAME": "Example.com",
"SERVER_PORT": "80",
}
user_ct = ContentType.objects.get_for_model(FooWithUrl)
obj = FooWithUrl.objects.create(name="john")
if Site._meta.installed:
response = shortcut(request, user_ct.id, obj.id)
self.assertEqual("http://%s/users/john/" % get_current_site(request).domain,
response._headers.get("location")[1])
Site._meta.installed = False
response = shortcut(request, user_ct.id, obj.id)
self.assertEqual("http://Example.com/users/john/",
response._headers.get("location")[1])
def test_get_current_site(self):
# Test that the correct Site object is returned
request = HttpRequest()
request.META = {
"SERVER_NAME": "example.com",
"SERVER_PORT": "80",
}
site = get_current_site(request)
self.assertTrue(isinstance(site, Site))
self.assertEqual(site.id, settings.SITE_ID)
# Test that an exception is raised if the sites framework is installed
# but there is no matching Site
site.delete()
self.assertRaises(ObjectDoesNotExist, get_current_site, request)
# A RequestSite is returned if the sites framework is not installed
Site._meta.installed = False
site = get_current_site(request)
self.assertTrue(isinstance(site, RequestSite))
self.assertEqual(site.name, "example.com")
def index(request, sitemaps):
"""
This view generates a sitemap index that uses the proper view
for resolving geographic section sitemap URLs.
"""
current_site = get_current_site(request)
sites = []
protocol = 'https' if request.is_secure() else 'http'
for section, site in sitemaps.items():
if callable(site):
pages = site().paginator.num_pages
else:
pages = site.paginator.num_pages
sitemap_url = urlresolvers.reverse('django.contrib.gis.sitemaps.views.sitemap', kwargs={'section': section})
sites.append('%s://%s%s' % (protocol, current_site.domain, sitemap_url))
if pages > 1:
for page in range(2, pages+1):
sites.append('%s://%s%s?p=%s' % (protocol, current_site.domain, sitemap_url, page))
xml = loader.render_to_string('sitemap_index.xml', {'sitemaps': sites})
return HttpResponse(xml, content_type='application/xml')
def email(self, comment, content_object, request):
"""
Send email notification of a new comment to site staff when email
notifications have been requested.
"""
if not self.email_notification:
return
recipient_list = [manager_tuple[1] for manager_tuple in settings.MANAGERS]
t = loader.get_template('comments/comment_notification_email.txt')
c = Context({ 'comment': comment,
'content_object': content_object })
subject = '[%s] New comment posted on "%s"' % (get_current_site(request).name,
content_object)
message = t.render(c)
send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, recipient_list, fail_silently=True)
def send_email(self, email):
protocol = getattr(settings, "DEFAULT_HTTP_PROTOCOL", "http")
current_site = get_current_site(self.request)
for user in User.objects.filter(email__iexact=email):
uid = int_to_base36(user.id)
token = self.make_token(user)
password_reset_url = "{0}://{1}{2}".format(
protocol,
current_site.domain,
reverse("account_password_reset_token", kwargs=dict(uidb36=uid, token=token))
)
ctx = {
"user": user,
"current_site": current_site,
"password_reset_url": password_reset_url,
}
subject = render_to_string("account/email/password_reset_subject.txt", ctx)
subject = "".join(subject.splitlines())
message = render_to_string("account/email/password_reset.txt", ctx)
send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, [user.email])
def on_comment_posted(sender, comment, request, **kwargs):
"""
Send email notification of a new comment to site staff when email notifications have been requested.
"""
# This code is copied from django_comments.moderation.
# That code doesn't offer a RequestContext, which makes it really
# hard to generate proper URL's with FQDN in the email
#
# Instead of implementing this feature in the moderator class, the signal is used instead
# so the notification feature works regardless of a manual moderator.register() call in the project.
if not appsettings.FLUENT_COMMENTS_USE_EMAIL_NOTIFICATION:
return
recipient_list = [manager_tuple[1] for manager_tuple in settings.MANAGERS]
site = get_current_site(request)
content_object = comment.content_object
if comment.is_removed:
subject = u'[{0}] Spam comment on "{1}"'.format(site.name, content_object)
elif not comment.is_public:
subject = u'[{0}] Moderated comment on "{1}"'.format(site.name, content_object)
else:
subject = u'[{0}] New comment posted on "{1}"'.format(site.name, content_object)
context = {
'site': site,
'comment': comment,
'content_object': content_object
}
if django.VERSION >= (1, 8):
message = render_to_string("comments/comment_notification_email.txt", context, request=request)
else:
message = render_to_string("comments/comment_notification_email.txt", context, context_instance=RequestContext(request))
send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, recipient_list, fail_silently=True)
def send_email(self, user):
protocol = getattr(settings, "DEFAULT_HTTP_PROTOCOL", "http")
current_site = get_current_site(self.request)
ctx = {
"user": user,
"protocol": protocol,
"current_site": current_site,
}
subject = render_to_string("account/email/password_change_subject.txt", ctx)
subject = "".join(subject.splitlines())
message = render_to_string("account/email/password_change.txt", ctx)
send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, [user.email])
def send_email(self, user):
protocol = getattr(settings, "DEFAULT_HTTP_PROTOCOL", "http")
current_site = get_current_site(self.request)
ctx = {
"user": user,
"protocol": protocol,
"current_site": current_site,
}
subject = render_to_string("account/email/password_change_subject.txt", ctx)
subject = "".join(subject.splitlines())
message = render_to_string("account/email/password_change.txt", ctx)
send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, [user.email])
def site(request):
return {
'site': SimpleLazyObject(lambda: get_current_site(request)),
}
def site(request):
return {
'site': SimpleLazyObject(lambda: get_current_site(request)),
}
def login(request, template_name='registration/login.html',
redirect_field_name=REDIRECT_FIELD_NAME,
authentication_form=AuthenticationForm,
current_app=None, extra_context=None):
"""
Displays the login form and handles the login action.
"""
redirect_to = request.REQUEST.get(redirect_field_name, '')
if request.method == "POST":
form = authentication_form(request, data=request.POST)
if form.is_valid():
# Ensure the user-originating redirection url is safe.
if not is_safe_url(url=redirect_to, host=request.get_host()):
redirect_to = resolve_url(settings.LOGIN_REDIRECT_URL)
# Okay, security check complete. Log the user in.
auth_login(request, form.get_user())
return HttpResponseRedirect(redirect_to)
else:
form = authentication_form(request)
current_site = get_current_site(request)
context = {
'form': form,
redirect_field_name: redirect_to,
'site': current_site,
'site_name': current_site.name,
}
if extra_context is not None:
context.update(extra_context)
return TemplateResponse(request, template_name, context,
current_app=current_app)
def logout(request, next_page=None,
template_name='registration/logged_out.html',
redirect_field_name=REDIRECT_FIELD_NAME,
current_app=None, extra_context=None):
"""
Logs out the user and displays 'You are logged out' message.
"""
auth_logout(request)
if next_page is not None:
next_page = resolve_url(next_page)
if redirect_field_name in request.REQUEST:
next_page = request.REQUEST[redirect_field_name]
# Security check -- don't allow redirection to a different host.
if not is_safe_url(url=next_page, host=request.get_host()):
next_page = request.path
if next_page:
# Redirect to this page until the session has been cleared.
return HttpResponseRedirect(next_page)
current_site = get_current_site(request)
context = {
'site': current_site,
'site_name': current_site.name,
'title': _('Logged out')
}
if extra_context is not None:
context.update(extra_context)
return TemplateResponse(request, template_name, context,
current_app=current_app)
def save(self, domain_override=None,
subject_template_name='registration/password_reset_subject.txt',
email_template_name='registration/password_reset_email.html',
use_https=False, token_generator=default_token_generator,
from_email=None, request=None):
"""
Generates a one-use only link for resetting password and sends to the
user.
"""
from django.core.mail import send_mail
UserModel = get_user_model()
email = self.cleaned_data["email"]
active_users = UserModel._default_manager.filter(
email__iexact=email, is_active=True)
for user in active_users:
# Make sure that no email is sent to a user that actually has
# a password marked as unusable
if not user.has_usable_password():
continue
if not domain_override:
current_site = get_current_site(request)
site_name = current_site.name
domain = current_site.domain
else:
site_name = domain = domain_override
c = {
'email': user.email,
'domain': domain,
'site_name': site_name,
'uid': urlsafe_base64_encode(force_bytes(user.pk)),
'user': user,
'token': token_generator.make_token(user),
'protocol': 'https' if use_https else 'http',
}
subject = loader.render_to_string(subject_template_name, c)
# Email subject *must not* contain newlines
subject = ''.join(subject.splitlines())
email = loader.render_to_string(email_template_name, c)
send_mail(subject, email, from_email, [user.email])
def flatpage(request, url):
"""
Public interface to the flat page view.
Models: `flatpages.flatpages`
Templates: Uses the template defined by the ``template_name`` field,
or :template:`flatpages/default.html` if template_name is not defined.
Context:
flatpage
`flatpages.flatpages` object
"""
if not url.startswith('/'):
url = '/' + url
site_id = get_current_site(request).id
try:
f = get_object_or_404(FlatPage,
url__exact=url, sites__id__exact=site_id)
except Http404:
if not url.endswith('/') and settings.APPEND_SLASH:
url += '/'
f = get_object_or_404(FlatPage,
url__exact=url, sites__id__exact=site_id)
return HttpResponsePermanentRedirect('%s/' % request.path)
else:
raise
return render_flatpage(request, f)
def sitemap(request, sitemaps, section=None):
"""
This view generates a sitemap with additional geographic
elements defined by Google.
"""
maps, urls = [], []
if section is not None:
if section not in sitemaps:
raise Http404(_("No sitemap available for section: %r") % section)
maps.append(sitemaps[section])
else:
maps = list(six.itervalues(sitemaps))
page = request.GET.get("p", 1)
current_site = get_current_site(request)
for site in maps:
try:
if callable(site):
urls.extend(site().get_urls(page=page, site=current_site))
else:
urls.extend(site.get_urls(page=page, site=current_site))
except EmptyPage:
raise Http404(_("Page %s empty") % page)
except PageNotAnInteger:
raise Http404(_("No page '%s'") % page)
xml = loader.render_to_string('gis/sitemaps/geo_sitemap.xml', {'urlset': urls})
return HttpResponse(xml, content_type='application/xml')
def index(request, sitemaps,
template_name='sitemap_index.xml', content_type='application/xml',
sitemap_url_name='django.contrib.sitemaps.views.sitemap',
mimetype=None):
if mimetype:
warnings.warn("The mimetype keyword argument is deprecated, use "
"content_type instead", DeprecationWarning, stacklevel=2)
content_type = mimetype
req_protocol = 'https' if request.is_secure() else 'http'
req_site = get_current_site(request)
sites = []
for section, site in sitemaps.items():
if callable(site):
site = site()
protocol = req_protocol if site.protocol is None else site.protocol
sitemap_url = urlresolvers.reverse(
sitemap_url_name, kwargs={'section': section})
absolute_url = '%s://%s%s' % (protocol, req_site.domain, sitemap_url)
sites.append(absolute_url)
for page in range(2, site.paginator.num_pages + 1):
sites.append('%s?p=%s' % (absolute_url, page))
return TemplateResponse(request, template_name, {'sitemaps': sites},
content_type=content_type)
def sitemap(request, sitemaps, section=None,
template_name='sitemap.xml', content_type='application/xml',
mimetype=None):
if mimetype:
warnings.warn("The mimetype keyword argument is deprecated, use "
"content_type instead", DeprecationWarning, stacklevel=2)
content_type = mimetype
req_protocol = 'https' if request.is_secure() else 'http'
req_site = get_current_site(request)
if section is not None:
if section not in sitemaps:
raise Http404("No sitemap available for section: %r" % section)
maps = [sitemaps[section]]
else:
maps = list(six.itervalues(sitemaps))
page = request.GET.get("p", 1)
urls = []
for site in maps:
try:
if callable(site):
site = site()
urls.extend(site.get_urls(page=page, site=req_site,
protocol=req_protocol))
except EmptyPage:
raise Http404("Page %s empty" % page)
except PageNotAnInteger:
raise Http404("No page '%s'" % page)
return TemplateResponse(request, template_name, {'urlset': urls},
content_type=content_type)
def process_response(self, request, response):
if response.status_code != 404:
return response # No need to check for a redirect for non-404 responses.
full_path = request.get_full_path()
current_site = get_current_site(request)
r = None
try:
r = Redirect.objects.get(site=current_site, old_path=full_path)
except Redirect.DoesNotExist:
pass
if settings.APPEND_SLASH and not request.path.endswith('/'):
# Try appending a trailing slash.
path_len = len(request.path)
full_path = full_path[:path_len] + '/' + full_path[path_len:]
try:
r = Redirect.objects.get(site=current_site, old_path=full_path)
except Redirect.DoesNotExist:
pass
if r is not None:
if r.new_path == '':
return http.HttpResponseGone()
return http.HttpResponsePermanentRedirect(r.new_path)
# No redirect was found. Return the response.
return response
def get_context_data(self, **kwargs):
context = super(VoiceMixin, self).get_context_data(**kwargs)
context.update({
'site': get_current_site(self.request),
'brand_view': BRAND_VIEW
})
return context
def send_email(self, user):
protocol = getattr(settings, "DEFAULT_HTTP_PROTOCOL", "http")
current_site = get_current_site(self.request)
ctx = {
"user": user,
"protocol": protocol,
"current_site": current_site,
}
subject = render_to_string("account/email/password_change_subject.txt", ctx)
subject = "".join(subject.splitlines())
message = render_to_string("account/email/password_change.txt", ctx)
send_mail(subject, message, settings.DEFAULT_FROM_EMAIL, [user.email])