def user_detail(request, id,format=None):
"""
Retrieve, update or delete a server assets instance.
"""
try:
snippet = User.objects.get(id=id)
except User.DoesNotExist:
return Response(status=status.HTTP_404_NOT_FOUND)
if request.method == 'GET':
serializer = UserSerializer(snippet)
return Response(serializer.data)
elif request.method == 'PUT':
serializer = UserSerializer(snippet, data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
elif request.method == 'DELETE':
if not request.user.has_perm('OpsManage.delete_user'):
return Response(status=status.HTTP_403_FORBIDDEN)
snippet.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
python类DoesNotExist()的实例源码
def authentication_hook(self, request, user_id=None, username=None, email=None, extra_params=None):
extra = extra_params if extra_params else {}
# automatically generate password from user_id
password = self._generate_password(user_id, settings.PASSWORD_GENERATOR_NONCE)
# username and email might be empty, depending on how edX LTI module is configured:
# there are individual settings for that + if it's embedded into an iframe it never sends
# email and username in any case
# so, since we want to track user for both iframe and non-iframe LTI blocks, username is completely ignored
uname = self._compress_user_name(user_id)
email = email if email else user_id+'@localhost'
try:
User.objects.get(username=uname)
except User.DoesNotExist:
try:
User.objects.create_user(username=uname, email=email, password=password)
except IntegrityError as e:
# A result of race condition of multiple simultaneous LTI requests - should be safe to ignore,
# as password and uname are stable (i.e. not change for the same user)
logger.info("IntegrityError creating user - assuming result of race condition: %s", e.message)
authenticated = authenticate(username=uname, password=password)
login(request, authenticated)
def profile_view(request, user_id):
try:
user = User.objects.get(pk=user_id)
except User.DoesNotExist:
return redirect("/taskManager/dashboard")
if request.user.groups.filter(name='admin_g').exists():
role = "Admin"
elif request.user.groups.filter(name='project_managers').exists():
role = "Project Manager"
else:
role = "Team Member"
sorted_projects = Project.objects.filter(
users_assigned=request.user.id).order_by('title')
return render(request, 'taskManager/profile_view.html',
{'user': user, 'role': role, 'project_list': sorted_projects})
def logout(request):
""" Logout a user
"""
try:
token = request.environ['HTTP_X_API_TOKEN']
except (KeyError, IndexError, TypeError):
raise BadRequest('Missing HTTP X-Api-Token header')
try:
data = jwt.decode(token, settings.SECRET_KEY)
data = json.loads(CRYPTO.decrypt(str(data['data'])))
user = User.objects.get(id=data['id'])
user.last_login = datetime.fromtimestamp(0)
user.save()
return {'message': 'Logged out'}
except (utils.CryptoException, KeyError, jwt.DecodeError,
jwt.ExpiredSignature, User.DoesNotExist):
raise BadRequest('Invalid token')
def work_todo_done(request, todo_id):
#import pdb; pdb.set_trace()
if request.method == "POST":
try:
todo = Commitment.objects.get(id=todo_id)
except Commitment.DoesNotExist:
todo = None
if todo:
todo.finished = True
todo.save()
event = todo.todo_event()
if not event:
event = create_event_from_todo(todo)
event.save()
next = request.POST.get("next")
return HttpResponseRedirect(next)
def work_todo_change(request, todo_id):
#import pdb; pdb.set_trace()
if request.method == "POST":
try:
todo = Commitment.objects.get(id=todo_id)
except Commitment.DoesNotExist:
todo = None
if todo:
agent = get_agent(request)
prefix = todo.form_prefix()
form = WorkTodoForm(data=request.POST, agent=agent, instance=todo, prefix=prefix)
if form.is_valid():
todo = form.save()
next = request.POST.get("next")
return HttpResponseRedirect(next)
def work_todo_time(request):
#import pdb; pdb.set_trace()
if request.method == "POST":
todo_id = request.POST.get("todoId")
try:
todo = Commitment.objects.get(id=todo_id)
except Commitment.DoesNotExist:
todo = None
if todo:
hours = request.POST.get("hours")
if hours:
qty = Decimal(hours)
else:
qty = Decimal("0.0")
event = todo.todo_event()
if event:
event.quantity = qty
event.save()
else:
event = create_event_from_todo(todo)
event.quantity = qty
event.save()
return HttpResponse("Ok", content_type="text/plain")
def work_todo_mine(request, todo_id):
#import pdb; pdb.set_trace()
if request.method == "POST":
try:
todo = Commitment.objects.get(id=todo_id)
except Commitment.DoesNotExist:
todo = None
if todo:
agent = get_agent(request)
todo.from_agent = agent
todo.save()
next = request.POST.get("next")
if next:
return HttpResponseRedirect(next)
return HttpResponseRedirect('/%s/'
% ('work/my-dashboard'))
def work_todo_description(request):
#import pdb; pdb.set_trace()
if request.method == "POST":
todo_id = request.POST.get("todoId")
try:
todo = Commitment.objects.get(id=todo_id)
except Commitment.DoesNotExist:
todo = None
if todo:
did = request.POST.get("did")
event = todo.todo_event()
if event:
event.description = did
event.save()
else:
event = create_event_from_todo(todo)
event.description = did
event.save()
return HttpResponse("Ok", content_type="text/plain")
def test_get_user_anonymous(self, user_mock):
user_mock.objects.get.side_effect = User.DoesNotExist
access = Access(
interlink_id=None,
request=None,
response=None,
exception=None,
time=None,
view=None,
user=self.access_user,
custom=None,
process=None,
)
self.assertIsNone(access.get_user())
def createThread(request, topic_title=None):
if topic_title:
try:
if request.method == 'POST':
topic = Topic.getTopic(topic_title)
threadForm = ThreadForm(request.POST, prefix='thread')
postForm = PostForm(request.POST, prefix='post')
if threadForm.is_valid() and postForm.is_valid():
thread = threadForm.save(commit=False)
post = postForm.save(commit=False)
thread.op = post
thread.topic = topic
thread.save()
if request.user.is_authenticated():
post.created_by = request.user
post.save()
return HttpResponseRedirect(thread.relativeUrl)
else:
threadForm = ThreadForm(prefix='thread')
postForm = PostForm(prefix='post')
context = dict(threadForm=threadForm, postForm=postForm)
return render(request, 'djeddit/create_thread.html', context)
except Topic.DoesNotExist:
pass
return redirect('topics')
def topicPage(request, topic_title):
try:
topic = Topic.getTopic(topic_title)
except Topic.DoesNotExist:
raise Http404()
# edit topic form
if request.method == 'POST':
if not request.user.is_superuser:
return HttpResponseForbidden()
form = TopicForm(request.POST, instance=topic)
if form.is_valid():
form.save()
return redirect('topicPage', topic.urlTitle)
showForm = True
else:
form = TopicForm(instance=topic)
showForm = False
threads = Thread.objects.filter(topic=topic)
context = dict(topic=topic, threads=threads, showCreatedBy=True, showTopic=False, topicForm=form, showForm=showForm)
return render(request, 'djeddit/topic.html', context)
def editPost(request, post_uid=''):
try:
post = Post.objects.get(uid=post_uid)
thread = post.thread
except (Post.DoesNotExist, Thread.DoesNotExist):
raise Http404
if thread.locked or (request.user != post.created_by and not request.user.is_superuser):
return HttpResponseForbidden()
if request.method == 'POST':
postForm = PostForm(request.POST, instance=post, prefix='post')
threadForm = ThreadForm(request.POST, instance=thread, prefix='thread')
if postForm.is_valid():
postForm.save()
if threadForm.is_valid():
threadForm.save()
return HttpResponseRedirect(thread.relativeUrl)
else:
postForm = PostForm(instance=post, prefix='post')
if request.user.is_superuser and thread.op == post:
threadForm = ThreadForm(instance=thread, prefix='thread')
else:
threadForm = None
postForm.fields['content'].label = ''
context = dict(postForm=postForm, threadForm=threadForm, post_uid=post.uid, thread=thread, post=post)
return render(request, 'djeddit/edit_post.html', context)
def userSummary(request, username):
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
raise Http404
threads = Thread.objects.filter(op__created_by=user)
for t in threads:
t.modified_on = t.op.modified_on
replies = Post.objects.filter(created_by=user).exclude(parent=None)
context = dict(items=sorted(list(threads) + list(replies), key=lambda n: n.modified_on, reverse=True),
tCount=threads.count(),
rCount=replies.count(),
tPoints=(sum(t.op.score for t in threads)),
rPoints=(sum(r.score for r in replies)),
pageUser=user)
return render(request, 'djeddit/user_summary.html', context)
def setUserStatus(request):
"""set user status to either active (is_active=True), banned(is_active=False), or admin(is_superuser=True)"""
try:
username = request.POST['username']
status = request.POST['status']
except KeyError:
return HttpResponseBadRequest()
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
raise Http404()
if status == 'active':
user.is_active = True
user.is_superuser = False
elif status == 'banned':
user.is_active = False
user.is_superuser = False
elif status == 'admin':
user.is_active = True
user.is_superuser = True
else:
return HttpResponseBadRequest()
user.save()
return HttpResponse()
def user_edit(form_data):
uid = form_data.get('uid', '')
first_name = form_data.get('first_name', '')
last_name = form_data.get('last_name', '')
email = form_data.get('email', '')
group = form_data.get('group', '')
permission = form_data.getlist('permission')
# uid = form_data['uid']
# first_name = form_data['first_name']
# last_name = form_data['last_name']
# email = form_data['email']
# group = form_data['group']
# permission = form_data.getlist['permission']
try:
u = User.objects.get(pk=uid)
u.first_name = first_name
u.last_name = last_name
u.email = email
u.groups = group
u.user_permissions = permission
u.save()
return True, '???' + str(uid) + '?????'
except User.DoesNotExist:
return False, '???' + str(uid) + '????'
def default_profile(name):
"""
Set the default flag on the named profile and clear the default
flag on all other profiles.
:param name: A server profile name
:return: None
"""
try:
ServerProfile.objects.get(name=name)
except ServerProfile.DoesNotExist:
log.error("Profile '%s' not found" % name)
sys.exit(-1)
ServerProfile.objects.update(default=False)
ServerProfile.objects.filter(name=name).update(default=True)
def get(self, request, uidb64, token):
try:
uid = force_text(urlsafe_base64_decode(uidb64))
user = User.objects.get(pk=uid)
except(TypeError, ValueError, OverflowError, User.DoesNotExist):
user = None
if user is not None and account_confirmation_token.check_token(user, token):
## set the Person to being approved
person = Person.objects.get(email_address=user.email)
person.approved_by_user = True
person.save()
# user.is_active = True
# user.save()
# login(request, user)
# return redirect('home')
return HttpResponse('Thank you for confirming your source profile.') # Now you can view the live entry {}.').format(live_url)
else:
return HttpResponse('Confirmation link is invalid!')
## see which token the user matches
return render(request, 'confirmation.html', context)
def handle(self, *args, **kwargs):
user = xform = None
if len(args) > 0:
username = args[0]
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
raise CommandError("User %s does not exist" % username)
if len(args) > 1:
id_string = args[1]
try:
xform = XForm.objects.get(user=user, id_string=id_string)
except XForm.DoesNotExist:
raise CommandError("Xform %s does not exist for user %s" %
(id_string, user.username))
remongo = kwargs["remongo"]
update_all = kwargs["update_all"]
report_string = mongo_sync_status(remongo, update_all, user, xform)
self.stdout.write(report_string)
def handle(self, *args, **options):
try:
username = args[0]
except IndexError:
raise CommandError(_("You must provide the username to publish the"
" form to."))
# make sure user exists
try:
User.objects.get(username=username)
except User.DoesNotExist:
raise CommandError(_("The user '%s' does not exist.") % username)
try:
input_file = args[1]
except IndexError:
raise CommandError(_("You must provide the path to restore from."))
else:
input_file = os.path.realpath(input_file)
num_instances, num_restored = restore_backup_from_zip(
input_file, username)
sys.stdout.write("Restored %d of %d submissions\n" %
(num_restored, num_instances))
def handle(self, *args, **kwargs):
if args.__len__() < 2:
raise CommandError(_(u"path(xform instances) username"))
path = args[0]
username = args[1]
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
raise CommandError(_(u"Invalid username %s") % username)
debug = False
if debug:
print (_(u"[Importing XForm Instances from %(path)s]\n")
% {'path': path})
im_count = len(glob.glob(os.path.join(IMAGES_DIR, '*')))
print _(u"Before Parse:")
print _(u" --> Images: %(nb)d") % {'nb': im_count}
print (_(u" --> Instances: %(nb)d")
% {'nb': Instance.objects.count()})
import_instances_from_zip(path, user)
if debug:
im_count2 = len(glob.glob(os.path.join(IMAGES_DIR, '*')))
print _(u"After Parse:")
print _(u" --> Images: %(nb)d") % {'nb': im_count2}
print (_(u" --> Instances: %(nb)d")
% {'nb': Instance.objects.count()})
def save(self, **kwargs):
if not self.presenter_id:
self.presenter = get_current_user()
if not self.submission.is_transient:
for x, org in (('submitter', 'submitter_organisation'), ('sponsor', 'sponsor_name')):
email = getattr(self, '{0}_email'.format(x))
if email:
try:
user = get_user(email)
except User.DoesNotExist:
user = create_phantom_user(email, role=x)
user.first_name = getattr(self, '{0}_contact_first_name'.format(x))
user.last_name = getattr(self, '{0}_contact_last_name'.format(x))
user.save()
profile = user.profile
profile.title = getattr(self, '{0}_contact_title'.format(x))
profile.gender = getattr(self, '{0}_contact_gender'.format(x)) or 'f'
profile.organisation = getattr(self, org)
profile.save()
setattr(self, x, user)
return super().save(**kwargs)
def save(self, **kwargs):
if self.email and not self.submission_form.submission.is_transient:
try:
user = get_user(self.email)
except User.DoesNotExist:
user = create_phantom_user(self.email, role='investigator')
user.first_name = self.contact_first_name
user.last_name = self.contact_last_name
user.save()
profile = user.profile
profile.title = self.contact_title
profile.gender = self.contact_gender
profile.organisation = self.organisation
profile.save()
self.user = user
return super().save(**kwargs)
def do_password_reset(request, token=None):
try:
email, timestamp = _password_reset_token_factory.parse_token(token)
except (signing.BadSignature, signing.SignatureExpired):
return render(request, 'users/password_reset/reset_token_invalid.html', {})
try:
user = get_user(email)
except User.DoesNotExist:
raise Http404()
profile = user.profile
timestamp = datetime.utcfromtimestamp(timestamp).replace(tzinfo=timezone.utc)
if profile.last_password_change and profile.last_password_change > timestamp:
return render(request, 'users/password_reset/token_already_used.html', {})
form = SetPasswordForm(user, request.POST or None)
if form.is_valid():
form.save()
profile.last_password_change = timezone.now()
profile.save()
return render(request, 'users/password_reset/reset_complete.html', {})
return render(request, 'users/password_reset/reset_form.html', {
'user': user,
'form': form,
})
def accept_invitation(request, invitation_uuid=None):
try:
invitation = Invitation.objects.new().get(uuid=invitation_uuid)
except Invitation.DoesNotExist:
raise Http404
user = invitation.user
form = SetPasswordForm(invitation.user, request.POST or None)
if form.is_valid():
form.save()
profile = user.profile
profile.last_password_change = timezone.now()
profile.is_phantom = False
profile.forward_messages_after_minutes = 5
profile.save()
invitation.is_accepted = True
invitation.save()
user = auth.authenticate(email=invitation.user.email, password=form.cleaned_data['new_password1'])
auth.login(request, user)
return redirect('ecs.users.views.edit_profile')
return render(request, 'users/invitation/set_password_form.html', {
'form': form,
})
def handle_action(self, action_type, userid, inputs):
log.debug("handle action: action_type: {} userid: {}, inputs: {}".format(action_type,
userid,
inputs))
try:
user = User.objects.get(pk=userid)
except User.DoesNotExist:
log.error("User does not exist")
return
try:
mail_account = MailAccount.objects.get(user=user)
except MailAccount.DoesNotExist:
log.error('No MailAccount corresponding to the user')
return
# action
if action_type == SEND_EMAIL:
log.debug('handle action: send_email called')
self.send_email(mail_account=mail_account,
subject=inputs['subject'],
body=inputs['body'])
else:
raise NotSupportedAction()
def user_follow(request):
user_id = request.POST.get('id')
action = request.POST.get('action')
if user_id and action:
try:
user = User.objects.get(id=user_id)
if action == 'follow':
Contact.objects.get_or_create(
user_from = request.user,
user_to=user)
create_action(request.user, 'is following', user)
else:
Contact.objects.filter(user_from=request.user,
user_to=user).delete()
return JsonResponse({'status': 'ok'})
except User.DoesNotExist:
return JsonResponse({'status': 'ko'})
return JsonResponse({'status': 'ko'})
def add_page(request, category_name_slug):
try:
category = Category.objects.get(slug=category_name_slug)
except Category.DoesNotExist:
category = None
form = PageForm()
if request.method == 'POST':
form = PageForm(request.POST)
if form.is_valid():
if category:
page = form.save(commit=False)
page.category = category
page.views = 0
page.save()
# probably better to use a redirect here.
return show_category(request, category_name_slug)
else:
print(form.errors)
context_dict = {'form':form, 'category': category}
return render(request, 'rango/add_page.html', context_dict)
def profile(request, username):
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
return redirect('index')
userprofile = UserProfile.objects.get_or_create(user=user)[0]
form = UserProfileForm({'website': userprofile.website, 'picture': userprofile.picture})
if request.method == 'POST':
form = UserProfileForm(request.POST, request.FILES, instance=userprofile)
if form.is_valid():
form.save(commit=True)
return redirect('profile', user.username)
else:
print(form.errors)
return render(request, 'rango/profile.html', {'userprofile': userprofile, 'selecteduser': user, 'form': form})
def get_total_score(pk):
""" Calculates the users total score
:param pk: primary key of an user
:return: the total score count.
"""
try:
user = User.objects.get(pk=pk)
except User.DoesNotExist:
raise ValueError("unknown private key")
count = 0
for challenge in user.challenge_set.all():
count += challenge.points
return count