def index(self, request, *args, **kwargs):
extra_context = {}
errors = validate_server_db1()
errors += validate_server_db2()
errors += validate_server_db3()
errors += validate_project_db1()
errors += validate_project_db2()
errors += validate_uwsgi()
errors += validate_server_ip()
errors += validate_project_dirs()
if errors:
for e in errors:
messages.warning(request, e)
return admin.site.__class__.index(
self, request, extra_context=extra_context, *args, **kwargs)
python类warning()的实例源码
def validate_voucher(view):
"""Decorate a view making it check whether a discount voucher is valid.
If the voucher is invalid it will be removed and the user will be
redirected to the checkout summary view.
"""
@wraps(view)
def func(request, checkout, cart):
if checkout.voucher_code:
try:
Voucher.objects.active().get(code=checkout.voucher_code)
except Voucher.DoesNotExist:
del checkout.voucher_code
checkout.recalculate_discount()
msg = pgettext(
'Checkout warning',
'This voucher has expired. Please review your checkout.')
messages.warning(request, msg)
return redirect('checkout:summary')
return view(request, checkout, cart)
return func
def scan_scan_list(request: HttpRequest, scan_list_id: int) -> HttpResponse:
"""Schedule the scan of a scan list."""
scan_list = get_object_or_404(
ScanList.objects.prefetch_related(Prefetch(
'sites',
queryset=Site.objects.select_related('last_scan') \
.annotate_most_recent_scan_start() \
.annotate_most_recent_scan_end_or_null())
), pk=scan_list_id)
was_any_site_scannable = scan_list.scan()
if was_any_site_scannable:
num_scanning_sites = Scan.objects.filter(end__isnull=True).count()
messages.success(request,
_("Scans for this list have been scheduled. "+ \
"The total number of sites in the scanning queue "+ \
"is %i (including yours)." % num_scanning_sites))
else:
messages.warning(request,
_('All sites have been scanned recently. Please wait 30 minutes and try again.'))
return redirect(reverse('frontend:view_scan_list', args=(scan_list_id,)))
def save_related(self, request, form, *args, **kwargs):
"""
If the user has saved a gallery with a photo that belongs only to
different Sites - it might cause much confusion. So let them know.
"""
super(GalleryAdmin, self).save_related(request, form, *args, **kwargs)
orphaned_photos = form.instance.orphaned_photos()
if orphaned_photos:
msg = ungettext(
'The following photo does not belong to the same site(s)'
' as the gallery, so will never be displayed: %(photo_list)s.',
'The following photos do not belong to the same site(s)'
' as the gallery, so will never be displayed: %(photo_list)s.',
len(orphaned_photos)
) % {'photo_list': ", ".join([photo.title for photo in orphaned_photos])}
messages.warning(request, msg)
def add_slack_btn(request):
code = request.GET.get("code", "")
if len(code) < 8:
return HttpResponseBadRequest()
result = requests.post("https://slack.com/api/oauth.access", {
"client_id": settings.SLACK_CLIENT_ID,
"client_secret": settings.SLACK_CLIENT_SECRET,
"code": code
})
doc = result.json()
if doc.get("ok"):
channel = Channel()
channel.user = request.team.user
channel.kind = "slack"
channel.value = result.text
channel.save()
channel.assign_all_checks()
messages.success(request, "The Slack integration has been added!")
else:
s = doc.get("error")
messages.warning(request, "Error message from slack: %s" % s)
return redirect("hc-channels")
def project_details(request, project_id):
proj = Project.objects.filter(
users_assigned=request.user.id,
pk=project_id)
if not proj:
messages.warning(
request,
'You are not authorized to view this project')
return redirect('/taskManager/dashboard')
else:
proj = Project.objects.get(pk=project_id)
user_can_edit = request.user.has_perm('project_edit')
return render(request, 'taskManager/project_details.html',
{'proj': proj, 'user_can_edit': user_can_edit})
# A4: Insecure Direct Object Reference (IDOR)
def change_password(request):
if request.method == 'POST':
user = request.user
old_password = request.POST.get('old_password')
new_password = request.POST.get('new_password')
confirm_password = request.POST.get('confirm_password')
if authenticate(username=user.username, password=old_password):
if new_password == confirm_password:
user.set_password(new_password)
user.save()
messages.success(request, 'Password Updated')
else:
messages.warning(request, 'Passwords do not match')
else:
messages.warning(request, 'Invalid Password')
return render(request,
'taskManager/change_password.html',
{'user': request.user})
def activate(request, uid, token):
try:
uid = force_text(urlsafe_base64_decode(uid))
user = User.objects.get(pk=uid)
if request.user != user:
messages.warning(request, "User email can be verified")
return redirect('root')
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
messages.warning(request, "User email can be verified")
return redirect('root')
if account_activation_token.check_token(user, token):
messages.success(request, "Email verified!")
user.email_verified = True
user.save()
auth.login(request, user)
else:
messages.error(request, "This email verification url has expired")
return redirect('root')
def index(request):
signupForm = SignupForm()
signinForm = SigninForm()
context = {
'signupForm': signupForm,
'signinForm': signinForm,
'message': messages.debug(request, 'Bleh!')
}
# messages.debug(request, '%s SQL statements were executed.' % count)
# messages.info(request, 'Three credits remain in your account.')
# messages.success(request, 'Profile details updated.')
# messages.warning(request, 'Your account expires in three days.')
# messages.error(request, 'Bleh!')
return render(request, 'user/index.html', context)
def form_valid(self, form):
form.save()
if self.request.POST.get('test', '0').strip() == '1':
backend = self.request.event.get_mail_backend(force_custom=True)
try:
backend.test(self.request.event.settings.mail_from)
except Exception as e:
messages.warning(self.request, _('An error occured while contacting the SMTP server: %s') % str(e))
return redirect(reverse('orga:settings.mail.edit', kwargs={'event': self.request.event.slug}))
else:
if form.cleaned_data.get('smtp_use_custom'):
messages.success(self.request, _('Yay, your changes have been saved and the connection attempt to '
'your SMTP server was successful.'))
else:
messages.success(self.request, _('We\'ve been able to contact the SMTP server you configured. '
'Remember to check the "use custom SMTP server" checkbox, '
'otherwise your SMTP server will not be used.'))
else:
messages.success(self.request, _('Yay! We saved your changes.'))
ret = super().form_valid(form)
return ret
def dispatch(self, request, *args, **kwargs):
super().dispatch(request, *args, **kwargs)
submission = self.get_object()
nick = request.POST.get('nick')
try:
if '@' in nick:
speaker = User.objects.get(email__iexact=nick)
else:
speaker = User.objects.get(nick__iexact=nick)
except User.DoesNotExist:
speaker = create_user_as_orga(request.POST.get('nick'), submission=submission)
if not speaker:
messages.error(request, _('Please provide a valid nick or email address!'))
else:
if submission not in speaker.submissions.all():
speaker.submissions.add(submission)
speaker.save(update_fields=['submissions'])
submission.log_action('pretalx.submission.speakers.add', person=request.user, orga=True)
messages.success(request, _('The speaker has been added to the submission.'))
else:
messages.warning(request, _('The speaker was already part of the submission.'))
if not speaker.profiles.filter(event=request.event).exists():
SpeakerProfile.objects.create(user=speaker, event=request.event)
return redirect(submission.orga_urls.speakers)
def post(self, request, pk=id):
form = UploadFileForm(request.POST, request.FILES)
if form.is_valid():
try:
sitefile=request.FILES['file']
user = request.user
print sitefile
task = bulkuploadsites.delay(user, sitefile, pk)
if CeleryTaskProgress.objects.create(task_id=task.id, user=user, task_type=0):
messages.success(request, 'Sites are being uploaded. You will be notified in notifications list as well.')
else:
messages.success(request, 'Sites cannot be updated a the moment.')
return HttpResponseRedirect(reverse('fieldsight:proj-site-list', kwargs={'pk': pk}))
except Exception as e:
form.full_clean()
form._errors[NON_FIELD_ERRORS] = form.error_class(['Sites Upload Failed, UnSupported Data', e])
messages.warning(request, 'Site Upload Failed, UnSupported Data ')
return render(request, 'fieldsight/upload_sites.html', {'form': form, 'project': pk})
def form_valid(self, form):
data = form.clean()
if data.get('file_field'):
rules = data['file_field'].read().decode('utf8')
elif data.get('rules'):
rules = data.get('rules')
else:
messages.warning(self.request, 'Missing rules')
return self.form_invalid(form)
try:
counters = prometheus.import_rules(rules)
messages.info(self.request, 'Imported %s' % counters)
return redirect('rule-import')
except:
messages.error(self.request, 'Error importing rules')
return self.form_invalid(form)
def __call__(self, request):
# This works the same as the django middleware
# django.contrib.sites.middleware.CurrentSiteMiddleware
# but ensures that it uses our proxy object so that test cases
# properly find our rule_set object
request.site = models.Site.objects.get_current()
# Get our logged in user to use with our audit logging plugin
if request.user.is_authenticated():
_user.value = request.user
response = self.get_response(request)
triggers = {
'Config': trigger_write_config.send,
'Rules': trigger_write_rules.send,
'URLs': trigger_write_urls.send,
}
for msg, func in triggers.items():
for (receiver, status) in func(self, request=request, force=True):
if status is False:
messages.warning(request, 'Error queueing %s ' % msg)
return response
def remove_member(request, team_id, user_id):
team = get_object_or_404(models.Team, pk=team_id)
# Only staff and team's captain can edit team
if not request.user.is_staff and team.captain != request.user:
return HttpResponseNotFound()
user = get_object_or_404(users_models.User, pk=user_id)
with transaction.atomic():
if user == team.captain:
messages.warning(request, 'Captain can\'t leave the team')
return redirect(team)
if user not in team.members.all():
messages.warning(request, 'User %s not in the team' % (user.username,))
return redirect(team)
team.members.remove(user)
team.save()
messages.success(request, 'User %s has left the team %s' % (user.username, team.name))
return redirect(team)
def restart(request,app,model,object_id,instance):
"""
:param request:
:param app:
:param model:
:param object_id:
:return:
"""
try:
inst = Instance.objects.get(id=int(instance))
if request.user == inst.starter:
inst.delete()
messages.success(request,_("workflow restarted success"))
else:
messages.warning(request,_("you do not have the permission to restart,only the starter can restart"))
except Exception,e:
messages.error(request,e)
return HttpResponseRedirect("/admin/%s/%s/%s"%(app,model,object_id))
def get_user_name(backend, details, response, is_new=False, *args, **kwargs):
request = backend.strategy.request
errors = {}
if is_new==False:
return {}
if request.method == 'GET':
return render(request, 'stud_auth/select_user_name.html', {})
else:
if request.POST.get('cancel_button'):
messages.warning(request, u"Entrance is canceled")
return HttpResponseRedirect(reverse('home'))
elif request.POST.get('success_button'):
final_username = request.POST.get('username')
if final_username != '':
if len(User.objects.filter(username=final_username)) != 0:
errors['username'] = _(u"Sorry, but the same username already exists")
else:
errors['username'] = _(u"Please, enter your username")
if errors:
return render(request, 'stud_auth/select_user_name.html', {'errors': errors, 'user_name': final_username})
else:
return {'username': final_username}
def post(self, request, *args, **kwargs):
if request.POST.get('cancel_button'):
messages.warning(request, _(u"Deleting group calceled"))
return HttpResponseRedirect(reverse('groups'))
else:
group = Group.objects.get(pk=kwargs['pk'])
if Student.objects.filter(student_group=group):
messages.error(request, _(u"Deletion is impossible. There are students in the group"))
return HttpResponseRedirect(reverse('groups'))
else:
exams = Exam.objects.filter(exam_group=group)
if exams:
for exam in exams:
if Result.objects.filter(result_exam=exam):
messages.error(request, _(u"Deletion is impossible. There are available results for this group"))
return HttpResponseRedirect(reverse('groups'))
return super(GroupDeleteView, self).post(request, *args, **kwargs)
def results_delete(request, rid):
if request.method == "POST":
if request.POST.get('cancel_button'):
messages.warning(request, _(u"Deleting results of exam canceled"))
else:
exam = Exam.objects.get(pk=int(rid))
exam.is_completed = False
results = Result.objects.filter(result_exam=exam)
results.delete()
exam.save()
messages.success(request, _(u"Information about results of exam %s deleted successfully") % exam.name)
return HttpResponseRedirect(reverse('results'))
else:
try:
exam = Exam.objects.get(pk=int(rid))
except:
messages.error(_(u'There was an error on the server. Please try again later'))
return HttpResponseRedirect(reverse('results'))
else:
return render(request, 'students/results_confirm_delete.html', {'exam': exam})
def get_pending_enrollment_message(cls, pending_users, enrolled_in):
"""
Create message for the users who were enrolled in a course or program.
Args:
users: An iterable of PendingEnterpriseCustomerUsers who were successfully linked with a pending enrollment
enrolled_in (str): A string identifier for the course or program the pending users were linked to
Returns:
tuple: A 2-tuple containing a message type and message text
"""
pending_emails = [pending_user.user_email for pending_user in pending_users]
return (
'warning',
_(
"The following learners do not have an account on "
"{platform_name}. They have not been enrolled in "
"{enrolled_in}. When these learners create an account, they will "
"be enrolled automatically: {pending_email_list}"
).format(
platform_name=settings.PLATFORM_NAME,
enrolled_in=enrolled_in,
pending_email_list=', '.join(pending_emails),
)
)
def save_account_group(request, account, group_form):
"""Add a group to an account"""
group = group_form.cleaned_data['group']
special_case = (
(group.is_admin_group() or group.is_protected_group())
and account.is_default_account())
if special_case:
messages.error(
request, 'Default user may not be added to "%s" group.' % group)
else:
try:
account.accountgroup_set.get(id=group.id)
messages.warning(request,
'Group was not added as it has already been added.')
except AccountGroup.DoesNotExist:
account.accountgroup_set.add(group)
messages.success(
request, 'Added "%s" to group "%s"' % (account, group))
return HttpResponseRedirect(reverse('useradmin-account_detail',
args=[account.id]))
def custom_delete_selected(self, request, queryset):
if request.POST.get('post') != 'yes':
#the confirm page, or user not confirmed
return self.default_delete_action[0](self, request, queryset)
#user confirm to delete the publishes, execute the custom delete logic.
result = None
failed_objects = []
for style in queryset:
try:
style.delete()
except:
error = sys.exc_info()
failed_objects.append((style.identifier, traceback.format_exception_only(error[0], error[1])))
#remove failed, continue to process the next publish
continue
if failed_objects:
messages.warning(request, mark_safe("Some selected styles are deleted failed:<ul>{0}</ul>".format("".join(["<li>{0} : {1}</li>".format(o[0], o[1]) for o in failed_objects]))))
else:
messages.success(request, "All selected styles are deleted successfully")
def publish(self, request, queryset):
result = None
failed_objects = []
data = {"layers": [record.identifier for record in queryset]}
res = None
try:
res = requests.post(request, "{}/api/metajobs/".format(settings.BORG_URL), json=data)
res.raise_for_status()
result = res.json()
if result["status"]:
messages.success(request, "All selected records are published successfully")
else:
for layer, status in result.items():
if layer == "status": continue
if status["status"]: continue
failed_objects.append((layer, status["message"]))
if failed_objects:
messages.warning(request, mark_safe("Some selected records are published failed:<ul>{0}</ul>".format("".join(["<li>{0} : {1}</li>".format(o[0], o[1]) for o in failed_objects]))))
else:
messages.success(request, "All selected records are published successfully")
except Exception as e:
traceback.print_exc()
messages.warning(request, str(e))
def custom_delete_selected(self, request, queryset):
if request.POST.get('post') != 'yes':
#the confirm page, or user not confirmed
return self.default_delete_action[0](self, request, queryset)
#user confirm to delete the publishes, execute the custom delete logic.
result = None
failed_objects = []
for record in queryset:
try:
record.delete()
except:
error = sys.exc_info()
failed_objects.append((record.identifier, traceback.format_exception_only(error[0], error[1])))
#remove failed, continue to process the next publish
continue
if failed_objects:
messages.warning(request, mark_safe("Some selected records are deleted failed:<ul>{0}</ul>".format("".join(["<li>{0} : {1}</li>".format(o[0], o[1]) for o in failed_objects]))))
else:
messages.success(request, "All selected records are deleted successfully")
def handle_line_bulk(self, request, ids):
study = self.get_object()
total = len(ids)
saved = 0
for value in ids:
line = self._get_line(value)
if line:
form = LineForm(request.POST, instance=line, prefix='line', study=study)
form.check_bulk_edit() # removes fields having disabled bulk edit checkbox
if form.is_valid():
form.save()
saved += 1
else:
for error in form.errors.values():
messages.warning(request, error)
messages.success(
request,
_('Saved %(saved)s of %(total)s Lines') % {
'saved': saved,
'total': total,
}
)
return True
def _handle_permission_update(self, permission_def):
ptype = permission_def.get('type', None)
kwargs = dict(study=self.object)
defaults = dict(permission_type=ptype)
if 'group' in permission_def:
kwargs.update(group_id=permission_def['group'].get('id', 0))
manager = self.object.grouppermission_set
elif 'user' in permission_def:
kwargs.update(user_id=permission_def['user'].get('id', 0))
manager = self.object.userpermission_set
elif 'public' in permission_def:
manager = self.object.everyonepermission_set
if manager is None or ptype is None:
logger.warning('Invalid permission type for add')
elif ptype == StudyPermission.NONE:
manager.filter(**kwargs).delete()
else:
kwargs.update(defaults=defaults)
manager.update_or_create(**kwargs)
# /study/<study_id>/import/
def password_change(request):
"""
Render Pontifex password change scheme
"""
td = {
"user": request.user,
}
if request.method == "POST":
form = PasswordChangeForm(user=request.user, data=request.POST)
td["form"] = form
if form.is_valid():
form.save()
messages.success(request, "Password successfully changed.")
return redirect("/users/profile/")
else:
for field, error_list in form.errors.iteritems():
for msg in error_list:
messages.warning(request, msg)
return render(request, "users/password_change.html", td)
else:
form = PasswordChangeForm(request.user)
td["form"] = form
return render(request, "users/password_change.html", td)
def password_reset(request):
"""
Render Pontifex password reset scheme
"""
td = {}
if request.method == "POST":
form = PasswordResetForm(request.POST)
if not form.is_valid():
show_form_field_errors(request, form)
return render(request, "users/password_reset.html", td)
elif not is_email(form.cleaned_data.get("email")):
messages.warning(request, "Please provide a valid email address.")
return render(request, "users/password_reset.html", td)
else:
form.save()
messages.success(request, "Success! We sent an email with a new password to the Pontifex user at %s." %
form.cleaned_data.get("email"))
return redirect("/")
else:
td["form"] = PasswordResetForm()
return render(request, "users/password_reset.html", td)
def userkey_required():
"""
Decorator for views which require that the user has an active UserKey (typically for encryption/decryption of
Secrets).
"""
def _decorator(view):
def wrapped_view(request, *args, **kwargs):
try:
uk = UserKey.objects.get(user=request.user)
except UserKey.DoesNotExist:
messages.warning(request, "This operation requires an active user key, but you don't have one.")
return redirect('user:userkey')
if not uk.is_active():
messages.warning(request, "This operation is not available. Your user key has not been activated.")
return redirect('user:userkey')
return view(request, *args, **kwargs)
return wrapped_view
return _decorator
def search_result(request):
if request.GET.get('keyword_list'): # note: ?? Key? ??? default ? ?? (None), KeyError ?? ??
keyword = request.GET['keyword_list']
books = Book.objects.filter(title__icontains=keyword)
return render(request, 'books/book_list.html', {'books': books})
if request.GET.get('keyword_wish'):
keyword = request.GET['keyword_wish']
books = get_book_info(keyword, display='5')
return render(request, 'books/wish_book.html', {'books': books})
if request.GET.get('keyword_register'):
keyword = request.GET['keyword_register']
books = get_book_info(keyword, display='5')
return render(request, 'books/register.html', {'books': books})
messages.warning(request, '???? ??????.')
return redirect('books:list')