def clean_invitees(self):
self.invitee_users = []
data = self.cleaned_data['invitees']
invitees = [s.strip() for s in data.split(',')]
for invitee in invitees:
User = get_user_model()
try:
invitee_user = User.objects.get(username=invitee)
self.invitee_users.append(invitee_user)
except User.DoesNotExist:
raise forms.ValidationError(_('There is no user "%s."')
% invitee)
has_invitation = bool(GroupInvitation.objects.filter(
group=self.group, invitee=invitee_user))
if has_invitation:
raise forms.ValidationError(
_('"%s" already has an invitation.') % invitee)
already_member = \
invitee_user.groups.filter(name=self.group.name).exists()
if already_member:
raise forms.ValidationError(
_('"%s" is already a member of this group.')
% invitee)
python类get_user_model()的实例源码
def resolve_user_or_group(self, old_id):
"""Resolve a user by its user id from old dudel."""
# connect to db
conn = psycopg2.connect(self.conn_string)
cursor = conn.cursor()
cursor.execute('SELECT username FROM "user" WHERE id=%s', (old_id,))
username = cursor.fetchone()
try:
if username:
return get_user_model().objects.get(username=username[0])
else:
cursor.execute('SELECT name FROM "group" WHERE id=%s', (old_id,))
groupname = cursor.fetchone()
if groupname:
return Group.objects.get(name=groupname[0])
except ObjectDoesNotExist:
return None
def test_switch_window(self):
self.do_login(username="admin", password="password")
self.get_url("admin:auth_user_change", self.user.id)
self.click('#add_id_groups')
old_window, new_window = self.switch_window()
self.fill({'#id_name': 'My new group'})
self.switch_window(old_window)
self.fill({'#id_first_name': 'My first name'})
self.switch_window(new_window)
self.submit('input[name=_save]', window_closes=True)
self.switch_window(old_window)
self.submit('input[name=_save]')
User = get_user_model()
user = User.objects.get(id=self.user.id)
self.assertEqual([g.name for g in user.groups.all()],
["My new group"])
self.assertEqual(user.first_name, "My first name")
def owner_search_fields(self):
"""
Returns all the fields that are CharFields except for password from the
User model. For the built-in User model, that means username,
first_name, last_name, and email.
"""
try:
from django.contrib.auth import get_user_model
except ImportError: # Django < 1.5
from django.contrib.auth.models import User
else:
User = get_user_model()
return [
field.name for field in User._meta.fields
if isinstance(field, models.CharField) and field.name != 'password'
]
def check_password(environ, username, password):
"""
Authenticates against Django's auth database
mod_wsgi docs specify None, True, False as return value depending
on whether the user exists and authenticates.
"""
UserModel = auth.get_user_model()
# db connection state is managed similarly to the wsgi handler
# as mod_wsgi may call these functions outside of a request/response cycle
db.reset_queries()
try:
try:
user = UserModel._default_manager.get_by_natural_key(username)
except UserModel.DoesNotExist:
return None
if not user.is_active:
return None
return user.check_password(password)
finally:
db.close_old_connections()
def groups_for_user(environ, username):
"""
Authorizes a user based on groups
"""
UserModel = auth.get_user_model()
db.reset_queries()
try:
try:
user = UserModel._default_manager.get_by_natural_key(username)
except UserModel.DoesNotExist:
return []
if not user.is_active:
return []
return [force_bytes(group.name) for group in user.groups.all()]
finally:
db.close_old_connections()
def user_is_registered(self, user):
"""Checks whether a ?User? is registered in the database.
Parameters
----------
user
Can be a Discord `User` object or `Member` object, or a user ID.
"""
if isinstance(user, discord.User) or isinstance(user, discord.Member):
try:
get_user_model().objects.get(id=user.id)
return True
except get_user_model().DoesNotExist:
return False
else:
try:
get_user_model().objects.get(id=user)
return True
except get_user_model().DoesNotExist:
return False
def check_password(environ, username, password):
"""
Authenticates against Django's auth database
mod_wsgi docs specify None, True, False as return value depending
on whether the user exists and authenticates.
"""
UserModel = auth.get_user_model()
# db connection state is managed similarly to the wsgi handler
# as mod_wsgi may call these functions outside of a request/response cycle
db.reset_queries()
try:
try:
user = UserModel._default_manager.get_by_natural_key(username)
except UserModel.DoesNotExist:
return None
if not user.is_active:
return None
return user.check_password(password)
finally:
db.close_old_connections()
def groups_for_user(environ, username):
"""
Authorizes a user based on groups
"""
UserModel = auth.get_user_model()
db.reset_queries()
try:
try:
user = UserModel._default_manager.get_by_natural_key(username)
except UserModel.DoesNotExist:
return []
if not user.is_active:
return []
return [force_bytes(group.name) for group in user.groups.all()]
finally:
db.close_old_connections()
def disable_admin_login():
'''
Disable admin login, but allow editing.
amended from: https://stackoverflow.com/a/40008282/517560
'''
User = get_user_model()
user, created = User.objects.update_or_create(
id=1,
defaults=dict(
first_name='Default Admin',
last_name='User',
is_superuser=True,
is_active=True,
is_staff=True
)
)
def no_login_has_permission(request):
setattr(request, 'user', user)
return True
return no_login_has_permission
def blogger_required(view_func):
"""
Only lets bloggers to access the supplied view and redirects readers.
"""
def _check_blogger_or_reader(request, *args, **kwargs):
UserModel = get_user_model()
user_id = request.user.id
user = UserModel.objects.select_related(
'profile', 'blog').prefetch_related().get(pk=user_id)
# update user instance on request with related objects to save queries.
request.user = user
if user.profile.is_blogger():
try:
if user.blog:
return view_func(request, *args, **kwargs)
except Blog.DoesNotExist:
return redirect('blog:blog_create')
else:
return redirect('blog:user_blog', username=request.user.username)
return wraps(view_func)(_check_blogger_or_reader)
def setUp(self):
User = get_user_model()
# create a user who does not want to get MyOtherNotification
user = User(username='test', password='Safepass1.')
user.save()
usernotification = UserNotification(user=user)
usernotification.save()
# refresh the user
self.user = User.objects.get(id=user.id)
# add a notification
notification = Notification(notification_class=MyOtherNotification.get_class_path())
notification.save()
# disable the notification
self.user.usernotification.disabled_notifications.add(notification)
self.user = User.objects.get(id=user.id)
def authenticate_payload(payload):
from rest_framework_sso.models import SessionToken
user_model = get_user_model()
if api_settings.VERIFY_SESSION_TOKEN:
try:
session_token = SessionToken.objects.\
active().\
select_related('user').\
get(pk=payload.get(claims.SESSION_ID), user_id=payload.get(claims.USER_ID))
user = session_token.user
except SessionToken.DoesNotExist:
raise exceptions.AuthenticationFailed(_('Invalid token.'))
else:
try:
user = user_model.objects.get(pk=payload.get(claims.USER_ID))
except user_model.DoesNotExist:
raise exceptions.AuthenticationFailed(_('Invalid token.'))
if not user.is_active:
raise exceptions.AuthenticationFailed(_('User inactive or deleted.'))
return user
def reports(request):
User = get_user_model()
now = make_aware(datetime.now())
ctx = {
'page': 'admin-reports',
'users': map(
lambda x: {'id': x.username, 'text': x.formatted_name},
User.objects.hide_meta()
),
'user_rates_form': UserRatesForm(),
'paid_task_form': PaidTaskForm(),
'now': now.strftime('%Y-%m-%d %H:%M:%S'),
'admin_report': True,
'paid_task_types': PaidTaskTypes,
}
return render(request, 'admin/reports.html', ctx)
def get_context_data(self, **kwargs_):
User = get_user_model()
language_code, project_code = split_pootle_path(self.pootle_path)[:2]
offset = self.kwargs.get("offset", 0)
top_scorers = User.top_scorers(
project=project_code,
language=language_code,
limit=TOP_CONTRIBUTORS_CHUNK_SIZE + 1,
offset=offset,
)
return get_top_scorers_data(
top_scorers,
TOP_CONTRIBUTORS_CHUNK_SIZE
)
def handle(self, **options):
if bool(options['user']) == options['all']:
raise CommandError("Either provide a 'user' to verify or "
"use '--all' to verify all users")
if options['all']:
for user in get_user_model().objects.hide_meta():
try:
utils.verify_user(user)
self.stdout.write("Verified user '%s'" % user.username)
except (ValueError, ValidationError) as e:
self.stderr.write(e.message)
if options['user']:
for user in options['user']:
try:
utils.verify_user(self.get_user(user))
self.stdout.write("User '%s' has been verified" % user)
except (ValueError, ValidationError) as e:
self.stderr.write(e.message)
def get_user_by_email(email):
"""Retrieves auser by its email address.
First it looks up the `EmailAddress` entries, and as a safety measure
falls back to looking up the `User` entries (these addresses are
sync'ed in theory).
:param email: address of the user to look up.
:return: `User` instance belonging to `email`, `None` otherwise.
"""
try:
return EmailAddress.objects.get(email__iexact=email).user
except EmailAddress.DoesNotExist:
try:
User = get_user_model()
return User.objects.get(email__iexact=email)
except User.DoesNotExist:
return None
def check_users(app_configs=None, **kwargs):
from django.contrib.auth import get_user_model
errors = []
User = get_user_model()
try:
admin_user = User.objects.get(username='admin')
except (User.DoesNotExist, OperationalError, ProgrammingError):
pass
else:
if admin_user.check_password('admin'):
errors.append(checks.Warning(
_("The default 'admin' user still has a password set to "
"'admin'."),
hint=_("Remove the 'admin' user or change its password."),
id="pootle.W016",
))
return errors
def get(self, request):
tasks = Task.objects.all().values("slug", "id")
grades = []
usernames = get_user_model().objects.all().values_list(
'username', flat=True)
for username in usernames:
user_grades = {'username': username}
task_submissions = TaskSubmission.objects \
.filter(user__username=username).values('task_id') \
.annotate(grade=Max('grade'))
for grade in task_submissions:
user_grades[grade['task_id']] = grade['grade']
grades.append(user_grades)
context = {
'grades': grades,
'tasks': tasks
}
return render(request, self.template_name, context, status=200)
def clean(self):
username = self.cleaned_data.get('username')
password = self.cleaned_data.get('password')
message = ERROR_MESSAGE
if username and password:
self.user_cache = authenticate(
username=username, password=password)
if self.user_cache is None:
if u'@' in username:
User = get_user_model()
# Mistakenly entered e-mail address instead of username? Look it up.
try:
user = User.objects.get(email=username)
except (User.DoesNotExist, User.MultipleObjectsReturned):
# Nothing to do here, moving along.
pass
else:
if user.check_password(password):
message = _("Your e-mail address is not your username."
" Try '%s' instead.") % user.username
raise forms.ValidationError(message)
elif not self.user_cache.is_active or not self.user_cache.is_staff:
raise forms.ValidationError(message)
return self.cleaned_data
def get(self, request, *args, **kwargs):
"""
Same as django.views.generic.edit.ProcessFormView.get(),
but adds test cookie stuff
"""
# Get the username from a keycloak set header
userid = request.META.get(settings.KEYCLOAK_USERNAME_HEADER)
if userid:
try:
user = get_user_model().objects.get(userid=userid)
except:
user = get_user_model().objects.create_user(
userid=userid, is_active=True)
user.backend = 'django.contrib.auth.backends.ModelBackend'
login(self.request, user)
self.user = user
return HttpResponseRedirect(self.get_success_url())
else:
self.set_test_cookie()
return super(LoginView, self).get(request, *args, **kwargs)
def test_user_can_have_multiple_teams_which_have_multiple_users(self):
o = Organisation(name='New Org')
o.save()
t1 = Team(name='Team Awesome', organisation=o)
t1.save()
t2 = Team(name='Team Great', organisation=o)
t2.save()
u1 = get_user_model().objects.create_user(userid='teamplayer')
u1.teams.add(t1)
u1.teams.add(t2)
u1.save()
u2 = get_user_model().objects.create_user(userid='teamplayer2')
u2.teams.add(t2)
u2.save()
self.assertIn(u1, t1.user_set.all())
self.assertIn(u1, t2.user_set.all())
self.assertNotIn(u2, t1.user_set.all())
self.assertIn(u2, t2.user_set.all())
self.assertEqual(len(t1.user_set.all()), 1)
self.assertEqual(len(t2.user_set.all()), 2)
def create_organisation(name, num_teams=0, num_members=0, usernames={}):
o = Organisation(name=name)
o.save()
user_global_id = 0
for x in range(0, num_teams):
t = Team(name='New Team %d' % (x + 1), organisation=o)
t.save()
for y in range(user_global_id, num_members + user_global_id):
if y in usernames.keys():
username = usernames[y]
else:
username = 'Team Member %d' % (y + 1)
u = get_user_model().objects.create_user(
userid='teammember%d' % (y + 1),
name=username,
)
u.teams.add(t)
u.save()
t.save()
# Before we go to the next team, increment start ID for member name
user_global_id += num_members
return o
def clean(self):
username = self.cleaned_data.get('username')
password = self.cleaned_data.get('password')
message = ERROR_MESSAGE
if username and password:
self.user_cache = authenticate(
username=username, password=password)
if self.user_cache is None:
if u'@' in username:
User = get_user_model()
# Mistakenly entered e-mail address instead of username? Look it up.
try:
user = User.objects.get(email=username)
except (User.DoesNotExist, User.MultipleObjectsReturned):
# Nothing to do here, moving along.
pass
else:
if user.check_password(password):
message = _("Your e-mail address is not your username."
" Try '%s' instead.") % user.username
raise forms.ValidationError(message)
elif not self.user_cache.is_active or not self.user_cache.is_staff:
raise forms.ValidationError(message)
return self.cleaned_data
def clean(self):
username = self.cleaned_data.get('username')
password = self.cleaned_data.get('password')
message = ERROR_MESSAGE
if username and password:
self.user_cache = authenticate(
username=username, password=password)
if self.user_cache is None:
if u'@' in username:
User = get_user_model()
# Mistakenly entered e-mail address instead of username? Look it up.
try:
user = User.objects.get(email=username)
except (User.DoesNotExist, User.MultipleObjectsReturned):
# Nothing to do here, moving along.
pass
else:
if user.check_password(password):
message = _("Your e-mail address is not your username."
" Try '%s' instead.") % user.username
raise forms.ValidationError(message)
elif not self.user_cache.is_active or not self.user_cache.is_staff:
raise forms.ValidationError(message)
return self.cleaned_data
def setUp(self):
# Disable Signals
post_save.disconnect(post_save_diary, sender=Diary)
# Run celery task synchronous
app.conf.update(CELERY_ALWAYS_EAGER=True)
self.test_username = TEST_USERNAME
self.test_password = TEST_PASSWORD
self.test_email = TEST_EMAIL
# Create a user
self.user = get_user_model().objects.create_user(
username=self.test_username,
password=self.test_password,
email=self.test_email,
)
# Login
self.client = APIClient()
self.client.login(
username=self.test_username,
password=self.test_password,
)
def run(self, user_id):
user = get_user_model().objects.get(pk=user_id)
# Send email to only user who has email except for TEST_EMAIL
if user.email and user.email != settings.TEST_EMAIL:
send_email(
sender=self.email_sender,
receiver=user.email,
subject=self.email_subject.format(
username=user.username
),
html=render_to_string(
self.email_template,
context={
"username": user.username,
"verification_key": user.profile.verification_key,
},
),
)
def clean(self):
username = self.cleaned_data.get('username')
password = self.cleaned_data.get('password')
message = ERROR_MESSAGE
if username and password:
self.user_cache = authenticate(
username=username, password=password)
if self.user_cache is None:
if u'@' in username:
User = get_user_model()
# Mistakenly entered e-mail address instead of username? Look it up.
try:
user = User.objects.get(email=username)
except (User.DoesNotExist, User.MultipleObjectsReturned):
# Nothing to do here, moving along.
pass
else:
if user.check_password(password):
message = _("Your e-mail address is not your username."
" Try '%s' instead.") % user.username
raise forms.ValidationError(message)
elif not self.user_cache.is_active or not self.user_cache.is_staff:
raise forms.ValidationError(message)
return self.cleaned_data
def test_can_run_queryset_locally(self):
from django.contrib.auth import get_user_model
User = get_user_model()
users = [User.objects.create(username=str(i)) for i in range(10)]
qs = User.objects.all()
self.assertEqual(qs.count(), 10)
local_qs = qs.locally().all()
self.assertTrue(isinstance(local_qs, query.QuerySet))
self.assertEqual(local_qs.count(), 10)
with self.assertNumQueries(0):
self.assertEqual(local_qs.count(), 10)
self.assertEqual(local_qs.filter(username='1').count(), 1)
for user in local_qs.order_by('-username'):
pass
notify_reviewers_unverified.py 文件源码
项目:timed-backend
作者: adfinis-sygroup
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
def _notify_reviewers(self, start, end, reports):
"""Notify reviewers on their unverified reports."""
User = get_user_model()
reviewers = User.objects.all_reviewers().filter(email__isnull=False)
subject = '[Timed] Verification of reports'
from_email = settings.DEFAULT_FROM_EMAIL
mails = []
for reviewer in reviewers:
if reports.filter(task__project__reviewers=reviewer).exists():
body = render_to_string(
'mail/notify_reviewers_unverified.txt', {
# we need start and end date in system format
'start': str(start),
'end': str(end),
'reviewer': reviewer,
'protocol': settings.HOST_PROTOCOL,
'domain': settings.HOST_DOMAIN,
}, using='text'
)
mails.append((subject, body, from_email, [reviewer.email]))
if len(mails) > 0:
send_mass_mail(mails)