def authjwt_method(token):
""" an authentication method using rest_framework_jwt
"""
import jwt
from rest_framework_jwt.authentication import (jwt_decode_handler,
jwt_get_username_from_payload)
try:
payload = jwt_decode_handler(token)
except (jwt.ExpiredSignature, jwt.DecodeError, jwt.InvalidTokenError):
return None
User = get_user_model()
username = jwt_get_username_from_payload(payload)
if not username: # pragma: no cover
return None
try:
user = User.objects.get_by_natural_key(username)
except User.DoesNotExist: # pragma: no cover
return None
return user
python类get_user_model()的实例源码
def clean(self):
username = self.cleaned_data.get('username')
password = self.cleaned_data.get('password')
message = ERROR_MESSAGE
if username and password:
self.user_cache = authenticate(
username=username, password=password)
if self.user_cache is None:
if u'@' in username:
User = get_user_model()
# Mistakenly entered e-mail address instead of username? Look it up.
try:
user = User.objects.get(email=username)
except (User.DoesNotExist, User.MultipleObjectsReturned):
# Nothing to do here, moving along.
pass
else:
if user.check_password(password):
message = _("Your e-mail address is not your username."
" Try '%s' instead.") % user.username
raise forms.ValidationError(message)
elif not self.user_cache.is_active or not self.user_cache.is_staff:
raise forms.ValidationError(message)
return self.cleaned_data
def create_user(self, form, commit=True, model=None, **kwargs):
User = model
if User is None:
User = get_user_model()
user = User(**kwargs)
username = form.cleaned_data.get("username")
if username is None:
username = self.generate_username(form)
user.username = username
user.email = form.cleaned_data["email"].strip()
password = form.cleaned_data.get("password")
if password:
user.set_password(password)
else:
user.set_unusable_password()
if commit:
user.save()
return user
def send_email(self, email):
User = get_user_model()
protocol = getattr(settings, "DEFAULT_HTTP_PROTOCOL", "http")
current_site = get_current_site(self.request)
email_qs = EmailAddress.objects.filter(email__iexact=email)
for user in User.objects.filter(pk__in=email_qs.values("user")):
uid = int_to_base36(user.id)
token = self.make_token(user)
password_reset_url = "{0}://{1}{2}".format(
protocol,
current_site.domain,
reverse("account_password_reset_token", kwargs=dict(uidb36=uid, token=token))
)
ctx = {
"user": user,
"current_site": current_site,
"password_reset_url": password_reset_url,
}
hookset.send_password_reset_email([user.email], ctx)
def test_user_can_login(self):
user = get_user_model().objects.create_user('test', email='test', password='test')
client = APIClient()
response = client.post(reverse('auth-login'), {'username': 'test', 'password': 'test'})
self.assertEqual(response.status_code, status.HTTP_200_OK)
auth_token = AuthToken.objects.get(user=user)
data = json.loads(response.content.decode('utf-8'))
token = data['token']
self.assertEqual(token, auth_token.key)