def add_tag(ticket_id, body, user):
""" Add ticket tag
"""
try:
tag = Tag.objects.get(**body)
ticket = Ticket.objects.get(id=ticket_id)
if ticket.__class__.__name__ != tag.tagType:
raise BadRequest('Invalid tag for ticket')
ticket.tags.add(tag)
ticket.save()
database.log_action_on_ticket(
ticket=ticket,
action='add_tag',
user=user,
tag_name=tag.name
)
except MultipleObjectsReturned:
raise BadRequest('Please use tag id')
except (KeyError, FieldError, IntegrityError, ObjectDoesNotExist, ValueError):
raise NotFound('Tag or ticket not found')
return {'message': 'Tag successfully added'}
python类MultipleObjectsReturned()的实例源码
def add_tag(report_id, body):
""" Add report tag
"""
try:
tag = Tag.objects.get(**body)
report = Report.objects.get(id=report_id)
if report.__class__.__name__ != tag.tagType:
raise BadRequest('Invalid tag for report')
report.tags.add(tag)
report.save()
except MultipleObjectsReturned:
raise BadRequest('Please use tag id')
except (KeyError, FieldError, IntegrityError, ObjectDoesNotExist, ValueError):
raise NotFound('Report or tag not found')
return {'message': 'Tag successfully added'}
def get(self, request, course_id, lesson_id, *args, **kwargs):
quiz_data = {}
template_dict = {}
lesson = get_object_or_404(Lesson, id=lesson_id)
# check if quiz for this lesson exists
try:
get_object_or_404(
PracticalTest,
theoretical_part=lesson
)
except MultipleObjectsReturned:
pass
questions = PracticalTest.objects.filter(theoretical_part=lesson)
for quest in questions:
quiz_data[quest.question] = AnswerOption.objects.filter(
practical_test=quest
)
template_dict['form'] = QuizForm(extra=quiz_data)
return render(request, self.template_name, template_dict)
def get(self, request, *args, **kwargs):
try:
key = request.query_params['key']
user = MomoUser.objects.get(hash_username=key)
user.is_active = True
user.save()
except MomoUser.DoesNotExist:
raise Http404
except MultipleObjectsReturned:
raise ValidationError(detail="?? ?? url? ?? ?????.")
except MultiValueDictKeyError:
raise ValidationError(detail="?? ?? url? ?? ?????.")
return Response({"user_pk": user.pk,
"detail": "user? ????????.",
"url": reverse('index', request=request)}, status=status.HTTP_200_OK,
template_name='member/activate.html')
def get_detail(self, request, **kwargs):
"""
Returns a single serialized resource.
Calls ``cached_obj_get/obj_get`` to provide the data, then handles that result
set and serializes it.
Should return a HttpResponse (200 OK).
"""
basic_bundle = self.build_bundle(request=request)
try:
obj = self.cached_obj_get(bundle=basic_bundle, **self.remove_api_resource_names(kwargs))
except ObjectDoesNotExist:
return http.HttpNotFound()
except MultipleObjectsReturned:
return http.HttpMultipleChoices("More than one resource is found at this URI.")
bundle = self.build_bundle(obj=obj, request=request)
bundle = self.full_dehydrate(bundle)
bundle = self.alter_detail_data_to_serialize(request, bundle)
return self.create_response(request, bundle)
def obj_get(self, bundle, **kwargs):
"""
A ORM-specific implementation of ``obj_get``.
Takes optional ``kwargs``, which are used to narrow the query to find
the instance.
"""
try:
object_list = self.get_object_list(bundle.request).filter(**kwargs)
stringified_kwargs = ', '.join(["%s=%s" % (k, v) for k, v in kwargs.items()])
if len(object_list) <= 0:
raise self._meta.object_class.DoesNotExist("Couldn't find an instance of '%s' which matched '%s'." % (self._meta.object_class.__name__, stringified_kwargs))
elif len(object_list) > 1:
raise MultipleObjectsReturned("More than '%s' matched '%s'." % (self._meta.object_class.__name__, stringified_kwargs))
bundle.obj = object_list[0]
self.authorized_read_detail(object_list, bundle)
return bundle.obj
except ValueError:
raise NotFound("Invalid resource lookup data provided (mismatched type).")
def test_get_one_does_not_iterate_long_sequence_indefinitely(self):
# Avoid typical performance pitfall of retrieving all objects.
# In rare failure cases, there may be large numbers. Fail fast.
class InfinityException(Exception):
"""Iteration went on indefinitely."""
def infinite_sequence():
"""Generator: count to infinity (more or less), then fail."""
for counter in range(3):
yield counter
raise InfinityException()
# Raises MultipleObjectsReturned as spec'ed. It does not
# iterate to infinity first!
self.assertRaises(
MultipleObjectsReturned, get_one, infinite_sequence())
def delete_model(self, request, obj):
# Retrieve the bin page or create it
try:
p = Page.objects.get(title_set__title=BIN_PAGE_NAME)
except ObjectDoesNotExist:
p = api.create_page(BIN_PAGE_NAME, constants.TEMPLATE_INHERITANCE_MAGIC, BIN_PAGE_LANGUAGE)
except MultipleObjectsReturned:
p = Page.objects.filter(title_set__title=BIN_PAGE_NAME).first()
# is the page already under the ~BIN folder?
is_in_bin = False
q = obj
while q:
if q.title_set.filter(title=BIN_PAGE_NAME).count() > 0:
is_in_bin = True
break
q = q.parent
# if yes -> delete it
if is_in_bin:
obj.delete()
p.fix_tree()
return
# else -> move it to the bin folder
# split the contents of the bin into buckets (too many children will slow the javascript down
bucket_title = datetime.datetime.now().strftime(BIN_BUCKET_NAMING)
try:
bucket = Page.objects.get(title_set__title=bucket_title)
except ObjectDoesNotExist:
bucket = api.create_page(bucket_title, constants.TEMPLATE_INHERITANCE_MAGIC,
BIN_PAGE_LANGUAGE, parent=p)
obj.move_page(bucket)
p.fix_tree()
obj.fix_tree()
bucket.fix_tree()
def get_tree(self, request):
"""
Get html for the descendants (only) of given page or if no page_id is
provided, all the root nodes.
Used for lazy loading pages in cms.pagetree.js
Permission checks is done in admin_utils.get_admin_menu_item_context
which is called by admin_utils.render_admin_menu_item.
"""
page_id = request.GET.get('pageId', None)
site_id = request.GET.get('site', None)
try:
site_id = int(site_id)
site = Site.objects.get(id=site_id)
except (TypeError, ValueError, MultipleObjectsReturned,
ObjectDoesNotExist):
site = get_current_site(request)
if page_id:
page = get_object_or_404(self.model, pk=int(page_id))
pages = page.get_children()
else:
pages = Page.get_root_nodes().filter(site=site, publisher_is_draft=True)#\
#.exclude(title_set__title__startswith='X')
pages = (
pages
.select_related('parent', 'publisher_public', 'site')
.prefetch_related('children')
)
response = render_admin_rows(request, pages, site=site, filtered=False)
return HttpResponse(response)
def test_get_social_auth(self):
"""
Tests that get_social_auth returns a user's edX social auth object, and if multiple edX social auth objects
exists, it raises an exception
"""
assert get_social_auth(self.user) == self.user.social_auth.get(provider=EdxOrgOAuth2.name)
UserSocialAuthFactory.create(user=self.user, uid='other name')
with self.assertRaises(MultipleObjectsReturned):
get_social_auth(self.user)
def get_or_create_service(service_infos):
"""
Create service or get it if exists
"""
valid_infos = {}
for key, value in service_infos.iteritems():
if key in SERVICE_FIELDS:
valid_infos[key] = value
try:
service, _ = Service.objects.get_or_create(**valid_infos)
except MultipleObjectsReturned:
service = Service.objects.filter(name=valid_infos['name'])[0]
return service
def get(self, **kwargs):
"""
Similar to the Django ORM's QuerySet.get
:param kwargs: it supports the same kwargs as filter does
"""
params = self.params.copy()
params.update(kwargs)
params = self._preprocess_filter_params(params)
pk = self.model._primary_key()
if set(params) - {'select_related'} == {pk}:
url = self.model._resource_url(params[pk])
del params[pk]
if params:
result = self.model._rest_call(url, params=params)
else:
result = self.model._rest_call(url)
else:
url = self.model._resources_url()
params['limit'] = 1
json_ = self.model._rest_call(url, params=params)
count = json_['count']
if count == 0:
raise self.model.DoesNotExist(
"%s matching query does not exist." % self.model.__name__
)
if count >= 2:
raise MultipleObjectsReturned(
"get() returned more than one %s -- it returned %d!" % (self.model.__name__, count)
)
result = json_['results'][0]
obj = self.model(**result)
obj._persisted = True
return obj
def get(self, **kwargs):
cloned = self.filter(**kwargs)
result = cloned.get_result()
if len(result) > 1:
raise MultipleObjectsReturned('get() returned more than one result, it returned {}'.format(cloned.count()))
return result[0]
def test_get_call(self):
fake_response = MagicMock(json=lambda:{'count': 10, 'results': range(10)})
with patch('rest_framework_queryset.queryset.requests.get', return_value=fake_response) as mock_get:
qs = RestFrameworkQuerySet('/api/')
with self.assertRaises(MultipleObjectsReturned):
qs1 = qs.get(a=123)
self.assertEqual(list(qs), range(10))
mock_get.assert_any_call('/api/', params={'a': 123})
def _create_item_transfers(self, sum_item_amounts, error_items=[]):
"""
Create ItemTransfers to alter inventory amounts
"""
transfers = []
for item, amount in sum_item_amounts.items():
try:
amount_symbol = '{:~}'.format(amount['amount']).split(' ')[1]
measure = AmountMeasure.objects.get(symbol=amount_symbol)
amount['amount'] = amount['amount'].magnitude
except:
measure = AmountMeasure.objects.get(symbol='item')
# Look up to see if there is a matching ItemTransfer already and then
# use this instead
try:
transfer = ItemTransfer.objects.get(item=item,
barcode=amount.get('barcode', None),
coordinates=amount.get('coordinates', None))
# At this point need to subtract amount from available in existing
# transfer! Need to mark in some way not completed though so can put
# back if the trasnfer is cancelled
transfer.amount_to_take = amount['amount']
except (ObjectDoesNotExist, MultipleObjectsReturned):
transfer = ItemTransfer(
item=item,
barcode=amount.get('barcode', None),
coordinates=amount.get('coordinates', None),
amount_taken=amount['amount'],
amount_measure=measure)
if item in error_items:
transfer.is_valid = False
transfers.append(transfer)
return transfers
chapter_29_example_06.py 文件源码
项目:two-scoops-of-django-1.11
作者: twoscoops
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def get_object_or_403(model, **kwargs):
try:
return model.objects.get(**kwargs)
except ObjectDoesNotExist:
raise PermissionDenied
except MultipleObjectsReturned:
raise PermissionDenied
def clean_email(self):
email = self.cleaned_data.get("email", "")
try:
auth.models.User.objects.get(email__iexact=email)
raise forms.ValidationError(_("A user using that email address already exists."))
except MultipleObjectsReturned:
raise forms.ValidationError(_("A user using that email address already exists."))
except auth.models.User.DoesNotExist: # @UndefinedVariable
pass
return email
def post(self, request):
"""
????json api??
---
request_serializer: UserRegisterSerializer
"""
serializer = UserRegisterSerializer(data=request.data)
if serializer.is_valid():
data = serializer.data
captcha = Captcha(request)
if not captcha.check(data["captcha"]):
return error_response(u"?????")
try:
User.objects.get(username=data["username"])
return error_response(u"??????")
except User.DoesNotExist:
pass
try:
User.objects.get(email=data["email"])
return error_response(u"??????????????????")
# ??????????????
except MultipleObjectsReturned:
return error_response(u"??????????????????")
except User.DoesNotExist:
user = User.objects.create(username=data["username"], real_name=data["real_name"],
email=data["email"])
user.set_password(data["password"])
user.save()
UserProfile.objects.create(user=user, school=data["school"], student_id=data["student_id"])
return success_response(u"?????")
else:
return serializer_invalid_response(serializer)
def clean(self):
cleaned_data = super(L2TraceForm, self).clean()
host_from = cleaned_data.get('host_from')
host_to = cleaned_data.get('host_to')
self.l2tracer = L2TraceQuery(host_from, host_to)
try:
self.l2tracer.trace()
except MultipleObjectsReturned:
msg = u"Input was ambiguous, matching multiple hosts"
raise forms.ValidationError(msg)
return cleaned_data
def subscribe(request):
"""
General entry point
for subscribers to
enter drip
"""
if request.method == 'POST':
form = DripSubscriberForm(request.POST)
if form.is_valid():
try:
drip_subscriber = DripSubscriber.objects.get(
email=form.cleaned_data["email"].lower()
)
messages.info(request, "subscription-exists")
return HttpResponseRedirect(reverse_lazy('index'))
except MultipleObjectsReturned:
messages.info(request, "subscription-exists")
return HttpResponseRedirect(reverse_lazy('index'))
except DripSubscriber.DoesNotExist:
drip_subscriber= DripSubscriber.objects.create(
email=form.cleaned_data["email"].lower(),
funnel_entry_point=form.cleaned_data["funnel_entry_point"]
)
messages.success(request, "subscription-success")
if request.session.get('redirect_to', False):
return HttpResponseRedirect(request.session.get('redirect_to', reverse_lazy("index")))
return HttpResponseRedirect(reverse_lazy('index'))
return HttpResponseRedirect(reverse_lazy("index"))
def post(self, request):
"""
????json api??
---
request_serializer: UserRegisterSerializer
"""
serializer = UserRegisterSerializer(data=request.data)
if serializer.is_valid():
data = serializer.data
captcha = Captcha(request)
if not captcha.check(data["captcha"]):
return error_response(u"?????")
try:
User.objects.get(username=data["username"])
return error_response(u"??????")
except User.DoesNotExist:
pass
try:
User.objects.get(email=data["email"])
return error_response(u"??????????????????")
# ??????????????
except MultipleObjectsReturned:
return error_response(u"??????????????????")
except User.DoesNotExist:
user = User.objects.create(username=data["username"], real_name=data["real_name"],
email=data["email"])
user.set_password(data["password"])
user.save()
UserProfile.objects.create(user=user, school=data["school"], student_id=data["student_id"])
return success_response(u"?????")
else:
return serializer_invalid_response(serializer)
def clean(self):
super(PrefixCSVForm, self).clean()
site = self.cleaned_data.get('site')
vlan_group = self.cleaned_data.get('vlan_group')
vlan_vid = self.cleaned_data.get('vlan_vid')
# Validate VLAN
if vlan_group and vlan_vid:
try:
self.instance.vlan = VLAN.objects.get(site=site, group__name=vlan_group, vid=vlan_vid)
except VLAN.DoesNotExist:
if site:
raise forms.ValidationError("VLAN {} not found in site {} group {}".format(
vlan_vid, site, vlan_group
))
else:
raise forms.ValidationError("Global VLAN {} not found in group {}".format(vlan_vid, vlan_group))
except MultipleObjectsReturned:
raise forms.ValidationError(
"Multiple VLANs with VID {} found in group {}".format(vlan_vid, vlan_group)
)
elif vlan_vid:
try:
self.instance.vlan = VLAN.objects.get(site=site, group__isnull=True, vid=vlan_vid)
except VLAN.DoesNotExist:
if site:
raise forms.ValidationError("VLAN {} not found in site {}".format(vlan_vid, site))
else:
raise forms.ValidationError("Global VLAN {} not found".format(vlan_vid))
except MultipleObjectsReturned:
raise forms.ValidationError("Multiple VLANs with VID {} found".format(vlan_vid))
def resource_from_data(self, fk_resource, data, request=None, related_obj=None, related_name=None):
"""
Given a dictionary-like structure is provided, a fresh related
resource is created using that data.
"""
# Try to hydrate the data provided.
data = dict_strip_unicode_keys(data)
fk_bundle = fk_resource.build_bundle(
data=data,
request=request
)
if related_obj:
fk_bundle.related_obj = related_obj
fk_bundle.related_name = related_name
unique_keys = dict((k, v) for k, v in data.iteritems() if k == 'pk' or (hasattr(fk_resource, k) and getattr(fk_resource, k).unique))
# If we have no unique keys, we shouldn't go look for some resource that
# happens to match other kwargs. In the case of a create, it might be the
# completely wrong resource.
# We also need to check to see if updates are allowed on the FK resource.
if unique_keys and fk_resource.can_update():
try:
return fk_resource.obj_update(fk_bundle, skip_errors=True, **data)
except (NotFound, TypeError):
try:
# Attempt lookup by primary key
return fk_resource.obj_update(fk_bundle, skip_errors=True, **unique_keys)
except NotFound:
pass
except MultipleObjectsReturned:
pass
# If we shouldn't update a resource, or we couldn't find a matching
# resource we'll just return a populated bundle instead
# of mistakenly updating something that should be read-only.
fk_bundle = fk_resource.full_hydrate(fk_bundle)
fk_resource.is_valid(fk_bundle)
return fk_bundle
def post(self, request):
"""
??????json api??
"""
text = request.data["updata"]
nodes = text.split()
UserSet = set()
output = []
for i in range(len(nodes)):
if i & 1:
UserSet.add((nodes[i - 1], nodes[i]))
for userdata in UserSet:
username = userdata[0]
password = userdata[1]
email = userdata[0] + "@acmoj.temp"
try:
user = User.objects.get(username=username)
user.set_password(password)
user.save()
except User.DoesNotExist:
pass
try:
user = User.objects.get(email = email)
except MultipleObjectsReturned:
continue
except User.DoesNotExist:
user = User.objects.create(username=username, real_name=username, email=email)
user.set_password(password)
user.save()
UserProfile.objects.create(user=user, school="unknown",student_id = 9527)
user.tps = password
output.append(user)
rendered = render(request, "oj/account/output.html", {"output": output})
return success_response({"content":rendered.content})
def post(self, request):
"""
????json api??
---
request_serializer: UserRegisterSerializer
"""
serializer = UserRegisterSerializer(data=request.data)
if serializer.is_valid():
data = serializer.data
captcha = Captcha(request)
if not captcha.check(data["captcha"]):
return error_response(u"?????")
try:
User.objects.get(username=data["username"])
return error_response(u"??????")
except User.DoesNotExist:
pass
try:
User.objects.get(email=data["email"])
return error_response(u"??????????????????")
# ??????????????
except MultipleObjectsReturned:
return error_response(u"??????????????????")
except User.DoesNotExist:
user = User.objects.create(username=data["username"], real_name=data["real_name"],
email=data["email"])
user.set_password(data["password"])
user.save()
UserProfile.objects.create(user=user, school=data["school"], student_id=data["student_id"])
return success_response(u"?????")
else:
return serializer_invalid_response(serializer)
def test_get_one_raises_model_error_if_query_result_is_too_big(self):
self.assertRaises(
FakeModel.MultipleObjectsReturned,
get_one,
FakeQueryResult(FakeModel, list(range(2))))
def test_get_one_raises_generic_error_if_other_sequence_is_too_big(self):
self.assertRaises(MultipleObjectsReturned, get_one, list(range(2)))
def free_task(self):
"""Free the task."""
try:
self.__lock(self.worker_id, new_state=TASK_STATES["FREE"], initial_states=(TASK_STATES["FREE"], TASK_STATES["ASSIGNED"], TASK_STATES["CREATED"]))
except (MultipleObjectsReturned, ObjectDoesNotExist):
raise Exception("Cannot free task %d, state is %s" % (self.id, self.get_state_display()))
def assign_task(self, worker_id=None):
"""Assign the task to a worker identified by worker_id."""
if worker_id is None:
worker_id = self.worker_id
try:
self.__lock(worker_id, new_state=TASK_STATES["ASSIGNED"], initial_states=(TASK_STATES["FREE"], TASK_STATES["CREATED"]))
except (MultipleObjectsReturned, ObjectDoesNotExist):
raise Exception("Cannot assign task %d" % (self.id))
def open_task(self, worker_id=None):
"""Open the task on a worker identified by worker_id."""
if worker_id is None:
worker_id = self.worker_id
try:
self.__lock(worker_id, new_state=TASK_STATES["OPEN"], initial_states=(TASK_STATES["FREE"], TASK_STATES["ASSIGNED"]))
except (MultipleObjectsReturned, ObjectDoesNotExist):
raise Exception("Cannot open task %d, state is %s" % (self.id, self.get_state_display()))