def send(self, request):
if (not self.last_email) or self.last_email + timedelta(hours=12) < now(): # TODO: TIMEDELTA mit config
old_lang = translation.get_language()
translation.activate(self.user.language)
link = reverse('poll_vote', args=(self.poll.url,)) # TODO: hier direkt das poll oder das Vote?
email_content = render_to_string('invitations/mail_invite.txt', {
'receiver': self.user.username,
'creator': self.creator.username,
'link': link
})
try:
send_mail("Invitation to vote on {}".format(self.poll.title), email_content, None, [self.user.email])
self.last_email = now()
self.save()
except SMTPRecipientsRefused:
translation.activate(old_lang)
messages.error(
request, _("The mail server had an error sending the notification to {}".format(self.user.username))
)
translation.activate(old_lang)
else:
messages.error(
request, _("You have send an Email for {} in the last 12 Hours".format(self.user.username))
)
python类error()的实例源码
def create(request):
error = None
group_name = ''
if request.method == 'POST':
group_name = request.POST.get('group_name', '')
try:
create_usergroup(request.user, group_name)
msg = _('Group "{0}" was created.').format(group_name)
messages.success(request, msg)
return redirect('groups_show', group_name)
except GroupError as e:
error = e.message
return TemplateResponse(request, 'groups/create.html', {
'error': error,
'group_name': group_name,
})
def render_to_response(self, context, **kwargs):
context['pdf'] = True
pdf_response = HttpResponse(content_type='application/pdf')
response = super().render_to_response(context, **kwargs)
response.render()
try:
html = HTML(string=response.content)
fileobj = io.BytesIO()
html.write_pdf(fileobj)
merger = PdfFileMerger()
merger.append(fileobj)
if self.object.science_case_file:
merger.append(self.object.science_case_file.file)
if self.object.experimental_design_file:
merger.append(self.object.experimental_design_file.file)
merger.write(pdf_response)
except Exception as exc:
error = 'There was an error generating your pdf. {}'
messages.error(self.request, error.format(str(exc)))
return HttpResponseRedirect(reverse('sciapplications:index'))
return pdf_response
def get_context_data(self, **kwargs):
context = super(LoginSignupView, self).get_context_data(**kwargs)
context.update({
'signup_form': RegistrationForm(),
'message':''
})
return context
# def change_password(request):
# if request.method == 'POST':
# form = PasswordChangeForm(request.user, request.POST)
# if form.is_valid():
# user = form.save()
# update_session_auth_hash(request, user) # Important!
# messages.success(request, 'Your password was successfully updated!')
# return redirect('login:change_password')
# else:
# messages.error(request, 'Please correct the error below.')
# else:
# form = PasswordChangeForm(request.user)
# response = render(request, 'password_change.html', {
# 'form': form
# })
# response.set_cookie('password_changed', 'true')
# return response
def watch(request, poll_url):
current_poll = get_object_or_404(Poll, url=poll_url)
if not current_poll.can_watch(request.user, request):
messages.error(
request, _("You are not allowed to watch this poll.")
)
return redirect('poll', poll_url)
if current_poll.user_watches(request.user):
poll_watch = PollWatch.objects.get(poll=current_poll, user=request.user)
poll_watch.delete()
else:
poll_watch = PollWatch(poll=current_poll, user=request.user)
poll_watch.save()
return redirect('poll', poll_url)
def can_vote(self, user: BitpollUser, request: HttpRequest, is_edit: bool=False) -> bool:
"""
Determine if the user is allowed to vote
:param is_edit: if the vote is an edit
:param user:
:param request:
:return:
"""
has_voted = self.has_voted(user)
if self.one_vote_per_user and has_voted and not is_edit:
messages.error(request, _("It is only one vote allowed. You have already voted."))
return False
elif self.require_login and not user.is_authenticated:
messages.error(request, _("Login required to vote."))
return False
elif self.require_invitation and (not user.is_authenticated or user not in self.invitation_set.all().values('user')):
messages.error(request, _("You are not allowed to vote in this poll. You have to be invited"))
return False
return True
def get(self, request, *args, **kwargs):
"""Process checkout when view triggered by GET request."""
logger = logging.getLogger('django')
try:
self.item = Lendable(type=self.kwargs.get('item_subtype', None),
user=self.request.user)
self.item.checkout()
self.item.save()
except Exception as e:
messages.error(request, e)
logger.exception('%s: %s' % (type(e).__name__, e))
return redirect(reverse('library:index'))
else:
messages.success(
request,
"'%s' is checked out to you until %s." %
(self.item.name,
formatting_filters.format_date(self.item.due_on))
)
return super(CheckoutView, self).get(request, *args, **kwargs)
def checkin(request, primary_key):
"""Checkin the :model:`library.Lendable`.
Cleanup and delete Lendable.
Redirect:
:view:`library.index`
"""
item = get_object_or_404(Lendable.all_types,
pk=primary_key,
user=request.user)
try:
item.checkin()
except Exception as e:
messages.error(request, e)
else:
messages.success(request, "'%s' returned." % (item.name))
return redirect(reverse('library:index'))
def choose_name(request):
username = request.POST['username']
try:
user = User.objects.create(username=username)
if request.user.is_authenticated():
old_user = request.user
django_logout(request)
old_user.delete()
_login_user(request, user)
messages.success(request, 'You have chosen "{}"!'.format(username))
except IntegrityError:
messages.error(request, 'Sorry, "{}" is already taken :('.format(username))
return redirect(request.GET.get('next', '/'))
# Gets context of previous and current player actions for Hotseat Gameplay
def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponseRedirect:
username = request.POST.get('username')
password = request.POST.get('password')
user = authenticate(username=username, password=password)
if user is None:
messages.error(request, _('No user account matches the entered credentials.'))
return redirect('common:login')
if not user.is_active:
messages.error(request, _('User account is deactivated.'))
return redirect('common:login')
login(request, user)
url = urllib.parse.unquote(request.GET.get('next', ''))
if url and is_safe_url(url, request.get_host()):
return redirect(url)
return redirect('/')
def disconnect(request, backend, association_id=None):
associated = request.user.social_auth.count()
url = request.REQUEST.get(REDIRECT_FIELD_NAME, '') or backend_setting(backend, 'SOCIAL_AUTH_DISCONNECT_REDIRECT_URL') or DEFAULT_REDIRECT
if not request.user.has_usable_password() and associated <= 1:
messages.error(request, _("Cannot remove the only Social Account without first setting a Password or adding another Social Account."))
return HttpResponseRedirect(url)
usa = request.user.social_auth.get(pk=association_id)
backend.disconnect(request.user, association_id)
messages.success(request, _("Removed the %(provider)s account '%(uid)s'.") % {
"provider": usa.provider,
"uid": usa.extra_data.get("display", usa.uid) if usa.extra_data is not None else usa.uid,
})
return HttpResponseRedirect(url)
def viewAuthor(request, initials):
try:
author = Author.objects.get(initials=initials)
except Author.DoesNotExist:
messages.error(request, "Unknown Author initials: %s." % initials)
return HttpResponseRedirect(reverse("viewAuthors"))
languageList = LanguageList.objects.get(
name=getDefaultLanguagelist(request))
languageData = languageList.languages.values_list(
'ascii_name', 'utf8_name', 'author', 'reviewer')
authored = []
reviewed = []
for aName, uName, aString, rString in languageData:
if author.fullName in set(aString.split(' and ')):
authored.append((aName, uName))
if author.fullName in set(rString.split(' and ')):
reviewed.append((aName, uName))
return render_template(
request, "author.html", {
'author': author,
'authored': authored,
'reviewed': reviewed,
'wordlist': getDefaultWordlist(request),
'content': fetchMarkdown(
"Author-description:-%s.md" % author.initials)})
def handle(self, request):
# Iterating form to update languages:
for entry in self.langlist:
try:
with transaction.atomic():
lang = Language.objects.get(id=entry.data['idField'])
if lang.isChanged(**entry.data):
problem = lang.setDelta(request, **entry.data)
if problem is None:
lang.save()
else:
messages.error(request,
lang.deltaReport(**problem))
except Exception:
logging.exception(
'Exception in LanguageDistributionTableForm.handle().',
extra=entry.data)
messages.error(request, 'Sorry, the server had problems '
'saving at least one language entry.')
def handle(self, request):
cladeChanged = False
idCladeMap = {c.id: c for c in Clade.objects.all()}
for entry in self.elements:
data = entry.data
try:
clade = idCladeMap[data['idField']]
if clade.isChanged(**data):
problem = clade.setDelta(request, **data)
if problem is None:
with transaction.atomic():
clade.save()
cladeChanged = True
else:
messages.error(
request, clade.deltaReport(**problem))
except Exception:
logging.exception('Problem saving clade '
'in view_clades.')
messages.error(request,
'Problem saving clade data: %s' % data)
return cladeChanged
def handle(self, request):
# Iterate entries that may be changed:
for entry in self.cogclass:
data = entry.data
cogclass = CognateClass.objects.get(
id=int(data['idField']))
# Check if entry changed and try to update:
if cogclass.isChanged(**data):
try:
with transaction.atomic():
problem = cogclass.setDelta(request, **data)
if problem is None:
cogclass.save()
else:
messages.error(
request, cogclass.deltaReport(**problem))
except Exception:
logging.exception('Problem saving CognateClass '
'in view_cognateclasses.')
messages.error(
request,
'Problem while saving entry: %s' % data)
def handle(self, request):
try:
c = CognateClass.objects.get(id=self.data['id'])
if c.isChanged(**self.data):
with transaction.atomic():
try:
problem = c.setDelta(**self.data)
if problem is None:
c.save()
else:
messages.error(request, c.deltaReport(**problem))
except Exception:
logging.exception(
'Exception handling CognateClassEditForm.')
messages.error(
request,
'Sorry, the server had problems '
'updating the cognate set.')
except CognateClass.DoesNotExist:
logging.exception('Cognate class does not exist in database.')
messages.error(
request,
"Sorry, cognate class %s does not exist on the server." %
self.data['id'])
def handle(self, request):
ms = [m.data for m in self.meanings]
for m in ms:
try:
meaning = Meaning.objects.get(id=m['meaningId'])
if meaning.isChanged(**m):
try:
problem = meaning.setDelta(request, **m)
if problem is None:
meaning.save()
else:
messages.error(
request, meaning.deltaReport(**problem))
except Exception:
logging.exception('Exception while saving POST '
'in view_wordlist.')
messages.error(request, 'Sorry, the server had '
'problems saving changes for '
'"%s".' % meaning.gloss)
except Exception:
logging.exception('Problem accessing Meaning object '
'in view_wordlist.',
extra=m)
messages.error(request, 'The server had problems saving '
'at least one entry.')
def handle(self, request):
data = self.data
try:
with transaction.atomic():
author = Author.objects.get(
id=data['idField'])
userChanged = data['user_id'] != author.user_id
if author.isChanged(**data) or userChanged:
problem = author.setDelta(request, **data)
if problem is None:
if userChanged and request.user.is_staff:
author.user_id = data['user_id']
author.save()
else:
messages.error(
request, author.deltaReport(**problem))
except Exception:
logging.exception('Problem while updating author.')
messages.error(
request, 'Problem saving author data: %s' % data)
def post(self, request):
form = ChooseNexusOutputForm(request.POST)
if form.is_valid():
export = NexusExport(
exportName=self.fileNameForForm(form),
description=form.cleaned_data["description"])
export.setSettings(form)
export.bump(request)
export.save()
theId = export.id
e = NexusExport.objects.get(id=theId)
e.exportName = "Exp%04d_%s" % (theId, e.exportName)
e.save()
return HttpResponseRedirect('/nexus/export/')
messages.error(request,"Please provide a short description.")
return self.render_to_response({"form": form})
def remove(request, device_id):
device_qs = (
twofa.models.Device.objects
.active_for_user(request.user).select_subclasses())
device = get_object_or_404(device_qs, pk=device_id)
if not device.can_delete():
messages.error(
request, _('The "%(auth_name)s" authenticator cannot be removed.') % {'auth_name': device.name()})
return redirect('twofa:list')
device.deleted_at = timezone.now()
device.save()
messages.success(
request, _('The "%(auth_name)s" authenticator has been removed from your account.') % {
'auth_name': device.name()})
if not twofa.models.Device.objects.active_for_user(request.user).exclude_backup().exists():
request.user.twofa_enabled = False
request.user.save()
messages.info(
request, _('Since you removed the last authenticator from your account, '
'two-factor authentication has now been disabled.'))
return redirect('twofa:list')
def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponseRedirect:
username = request.POST.get('username')
password = request.POST.get('password')
user = authenticate(username=username, password=password)
if user is None:
messages.error(request, _('No user account matches the entered credentials.'))
return redirect('backoffice:login')
if not user.is_active:
messages.error(request, _('User account is deactivated.'))
return redirect('backoffice:login')
if not is_backoffice_user(user):
messages.error(request, _('User does not have permission to access backoffice data.'))
return redirect('backoffice:login')
login(request, user)
url = request.GET.get('next')
if url and is_safe_url(url, request.get_host()):
return redirect(url)
return redirect('backoffice:main')
def reverse_session_view(request: HttpRequest, pk: int) -> Union[HttpRequest, HttpResponseRedirect]:
session = get_object_or_404(CashdeskSession, pk=pk)
if request.method == 'POST':
try:
reverse_session(session)
except FlowError as e:
messages.error(request, str(e))
else:
messages.success(request, _('All transactions in the session have been cancelled.'))
return redirect('backoffice:session-detail', pk=pk)
elif request.method == 'GET':
return render(request, 'backoffice/reverse_session.html', {
'session': session,
})
def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponseRedirect:
username = request.POST.get('username')
password = request.POST.get('password')
user = authenticate(username=username, password=password)
if user is None:
messages.error(request, _('No user account matches the entered credentials.'))
return redirect('troubleshooter:login')
if not user.is_active:
messages.error(request, _('User account is deactivated.'))
return redirect('troubleshooter:login')
if not troubleshooter_user(user):
messages.error(request, _('User does not have permission to access troubleshooter data.'))
return redirect('troubleshooter:login')
login(request, user)
return redirect('troubleshooter:main')
def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponseRedirect:
username = request.POST.get('username')
password = request.POST.get('password')
user = authenticate(username=username, password=password)
if user is not None:
if not user.is_active:
messages.error(request, _('User account is deactivated.'))
return redirect('desk:login')
session = user.get_current_session()
if session is None:
messages.error(request, _('You do not have an active session.'))
return redirect('desk:login')
if session.cashdesk != self.cashdesk:
messages.error(request, _('Your session is scheduled for a different cashdesk. Please go to '
'{desk}').format(desk=str(session.cashdesk)))
return redirect('desk:login')
login(request, user)
session.cashdesk.signal_next()
return redirect('desk:main')
else:
messages.error(request, _('No user account matches the entered credentials.'))
return redirect('desk:login')
def post(self, request, **kwargs):
try:
proposal = request.user.membership_set.get(proposal=kwargs.get('pk'), role=Membership.PI).proposal
except Membership.DoesNotExist:
raise Http404
emails = request.POST['email'].replace(' ', '').strip(',').split(',')
valid = True
for email in emails:
try:
validate_email(email)
except ValidationError:
valid = False
messages.error(request, _('Please enter a valid email address: {}'.format(email)))
if valid:
proposal.add_users(emails, Membership.CI)
messages.success(request, _('Co Investigator(s) invited'))
return HttpResponseRedirect(reverse('proposals:detail', kwargs={'pk': proposal.id}))
def clear_queue(request, queue_index):
queue_index = int(queue_index)
queue = get_queue_by_index(queue_index)
if request.method == 'POST':
try:
queue.empty()
messages.info(request, _('You have successfully cleared the queue %s') % queue.name)
except ResponseError as e:
if 'EVALSHA' in e.message:
messages.error(request,
_('This action is not supported on Redis versions < 2.6.0, '
'please use the bulk delete command instead'))
else:
raise e
return redirect('rq_jobs', queue_index)
context_data = admin.site.each_context(request)
context_data.update({
'title': _("Clear Queue"),
'queue_index': queue_index,
'queue': queue,
})
return render(request, 'django_rq/clear_queue.html', context_data)
def add_view(self, request, form_url='', extra_context=None):
language = request.GET.get('language')
draft_pk = request.GET.get('draft')
page = get_object_or_404(Page, pk=draft_pk)
# check if the current user may view the revision
# -> if the user may see the page
user = get_current_user()
if not user_can_change_page(user, page):
messages.error(request, _('Missing permission to edit this page which is necessary in order to create a '
'page version.'))
prev = request.META.get('HTTP_REFERER')
if prev:
return redirect(prev)
else:
raise Http404
if page.page_versions.filter(active=True, dirty=False, language=language).count() > 0:
messages.info(request, _('This page is already revised.'))
return self.render_close_frame()
return super(PageVersionAdmin, self).add_view(request, form_url=form_url, extra_context=extra_context)
def add_subpart(request, part_id):
user = request.user
profile = user.bom_profile()
organization = profile.organization
try:
part = Part.objects.get(id=part_id)
except ObjectDoesNotExist:
messages.error(request, "No part found with given part_id.")
return HttpResponseRedirect(reverse('error'))
if request.method == 'POST':
form = AddSubpartForm(request.POST, organization=organization, part_id=part_id)
if form.is_valid():
new_part = Subpart.objects.create(
assembly_part=part,
assembly_subpart=form.cleaned_data['assembly_subpart'],
count=form.cleaned_data['count']
)
return HttpResponseRedirect(reverse('part-info', kwargs={'part_id': part_id}) + '#bom')
def upload_file_to_part(request, part_id):
try:
part = Part.objects.get(id=part_id)
except ObjectDoesNotExist:
messages.error(request, "No part found with given part_id.")
return HttpResponseRedirect(reverse('error'))
if request.method == 'POST':
form = FileForm(request.POST, request.FILES)
if form.is_valid():
partfile = PartFile(file=request.FILES['file'], part=part)
partfile.save()
return HttpResponseRedirect(reverse('part-info', kwargs={'part_id': part_id}) + '#specs')
messages.error(request, "Error uploading file.")
return HttpResponseRedirect(reverse('error'))
def post(self, request, *args, **kwargs):
ids = request.POST.getlist('selected')
apps = models.Application.objects.filter(pk__in=ids).all()
mails = []
errors = 0
for app in apps:
try:
app.invite(request.user)
m = emails.create_invite_email(app, request)
mails.append(m)
except ValidationError:
errors += 1
if mails:
send_batch_emails(mails)
messages.success(request, "%s applications invited" % len(mails))
else:
messages.error(request, "%s applications not invited" % errors)
return HttpResponseRedirect(reverse('invite_list'))