def get_family(self):
"""
Returns a ``QuerySet`` containing the ancestors, the model itself
and the descendants, in tree order.
"""
opts = self._mptt_meta
left = getattr(self, opts.left_attr)
right = getattr(self, opts.right_attr)
ancestors = Q(**{
"%s__lte" % opts.left_attr: left,
"%s__gte" % opts.right_attr: right,
opts.tree_id_attr: self._mpttfield('tree_id'),
})
descendants = Q(**{
"%s__gte" % opts.left_attr: left,
"%s__lte" % opts.left_attr: right,
opts.tree_id_attr: self._mpttfield('tree_id'),
})
return self._tree_manager.filter(ancestors | descendants)
python类Q的实例源码
def _get_next_or_previous_by_FIELD(self, field, is_next, **kwargs):
if not self.pk:
raise ValueError("get_next/get_previous cannot be used on unsaved objects.")
op = 'gt' if is_next else 'lt'
order = '' if is_next else '-'
param = force_text(getattr(self, field.attname))
q = Q(**{'%s__%s' % (field.name, op): param})
q = q | Q(**{field.name: param, 'pk__%s' % op: self.pk})
qs = self.__class__._default_manager.using(self._state.db).filter(**kwargs).filter(q).order_by(
'%s%s' % (order, field.name), '%spk' % order
)
try:
return qs[0]
except IndexError:
raise self.DoesNotExist("%s matching query does not exist." % self.__class__._meta.object_name)
def _get_next_or_previous_by_FIELD(self, field, is_next, **kwargs):
if not self.pk:
raise ValueError("get_next/get_previous cannot be used on unsaved objects.")
op = 'gt' if is_next else 'lt'
order = '' if is_next else '-'
param = force_text(getattr(self, field.attname))
q = Q(**{'%s__%s' % (field.name, op): param})
q = q | Q(**{field.name: param, 'pk__%s' % op: self.pk})
qs = self.__class__._default_manager.using(self._state.db).filter(**kwargs).filter(q).order_by(
'%s%s' % (order, field.name), '%spk' % order
)
try:
return qs[0]
except IndexError:
raise self.DoesNotExist("%s matching query does not exist." % self.__class__._meta.object_name)
def insertion_target_filters(self, instance, order_insertion_by):
"""
Creates a filter which matches suitable right siblings for ``node``,
where insertion should maintain ordering according to the list of
fields in ``order_insertion_by``.
For example, given an ``order_insertion_by`` of
``['field1', 'field2', 'field3']``, the resulting filter should
correspond to the following SQL::
field1 > %s
OR (field1 = %s AND field2 > %s)
OR (field1 = %s AND field2 = %s AND field3 > %s)
"""
fields = []
filters = []
fields__append = fields.append
filters__append = filters.append
and_ = operator.and_
or_ = operator.or_
for field_name in order_insertion_by:
if field_name[0] == '-':
field_name = field_name[1:]
filter_suffix = '__lt'
else:
filter_suffix = '__gt'
value = getattr(instance, field_name)
if value is None:
# node isn't saved yet. get the insertion value from pre_save.
field = instance._meta.get_field(field_name)
value = field.pre_save(instance, True)
q = Q(**{field_name + filter_suffix: value})
filters__append(reduce(and_, [Q(**{f: v}) for f, v in fields] + [q]))
fields__append((field_name, value))
return reduce(or_, filters)
def get_ordered_insertion_target(self, node, parent):
"""
Attempts to retrieve a suitable right sibling for ``node``
underneath ``parent`` (which may be ``None`` in the case of root
nodes) so that ordering by the fields specified by the node's class'
``order_insertion_by`` option is maintained.
Returns ``None`` if no suitable sibling can be found.
"""
right_sibling = None
# Optimisation - if the parent doesn't have descendants,
# the node will always be its last child.
if parent is None or parent.get_descendant_count() > 0:
opts = node._mptt_meta
order_by = opts.order_insertion_by[:]
filters = self.insertion_target_filters(node, order_by)
if parent:
filters = filters & Q(**{opts.parent_attr: parent})
# Fall back on tree ordering if multiple child nodes have
# the same values.
order_by.append(opts.left_attr)
else:
filters = filters & Q(**{opts.parent_attr: None})
# Fall back on tree id ordering if multiple root nodes have
# the same values.
order_by.append(opts.tree_id_attr)
queryset = node.__class__._tree_manager.db_manager(node._state.db).filter(filters).order_by(*order_by)
if node.pk:
queryset = queryset.exclude(pk=node.pk)
try:
right_sibling = queryset[:1][0]
except IndexError:
# No suitable right sibling could be found
pass
return right_sibling
def _filter_or_exclude(self, negate, *args, **kwargs):
"We override this internal Django functon as it is used for all filter member functions."
translate_polymorphic_filter_definitions_in_args(self.model, args) # the Q objects
additional_args = translate_polymorphic_filter_definitions_in_kwargs(self.model, kwargs) # filter_field='data'
return super(PolymorphicQuerySet, self)._filter_or_exclude(negate, *(list(args) + additional_args), **kwargs)
def _get_next_or_previous_by_FIELD(self, field, is_next, **kwargs):
if not self.pk:
raise ValueError("get_next/get_previous cannot be used on unsaved objects.")
op = 'gt' if is_next else 'lt'
order = '' if is_next else '-'
param = force_text(getattr(self, field.attname))
q = Q(**{'%s__%s' % (field.name, op): param})
q = q | Q(**{field.name: param, 'pk__%s' % op: self.pk})
qs = self.__class__._default_manager.using(self._state.db).filter(**kwargs).filter(q).order_by(
'%s%s' % (order, field.name), '%spk' % order
)
try:
return qs[0]
except IndexError:
raise self.DoesNotExist("%s matching query does not exist." % self.__class__._meta.object_name)
def _get_next_or_previous_by_FIELD(self, field, is_next, **kwargs):
if not self.pk:
raise ValueError("get_next/get_previous cannot be used on unsaved objects.")
op = 'gt' if is_next else 'lt'
order = '' if is_next else '-'
param = force_text(getattr(self, field.attname))
q = Q(**{'%s__%s' % (field.name, op): param})
q = q | Q(**{field.name: param, 'pk__%s' % op: self.pk})
qs = self.__class__._default_manager.using(self._state.db).filter(**kwargs).filter(q).order_by(
'%s%s' % (order, field.name), '%spk' % order
)
try:
return qs[0]
except IndexError:
raise self.DoesNotExist("%s matching query does not exist." % self.__class__._meta.object_name)
def get_permitted_content_types(self):
"""
Method for getting a queryset of permitted content types for the model form
:return: QuerySet of ContentTYpe model instances
"""
content_types = getattr(settings, 'OMNI_FORMS_CONTENT_TYPES', None)
exclusions = getattr(settings, 'OMNI_FORMS_EXCLUDED_CONTENT_TYPES', [{'app_label': 'omniforms'}])
qs = ContentType.objects.all()
if content_types:
query = None
for content_type in content_types:
kwargs = self._query_filters(content_type)
if query is None:
query = Q(**kwargs)
else:
query = query | Q(**kwargs)
qs = qs.filter(query)
elif exclusions:
for exclusion in exclusions:
kwargs = self._query_filters(exclusion)
qs = qs.exclude(**kwargs)
return qs
def section(self, value, queryset=None):
if queryset is None:
queryset = self.published()
if not value:
return queryset
else:
try:
section_idx = self.model.section_idx(value)
except KeyError:
raise InvalidSection
all_sections = Q(section=self.model.section_idx(settings.PINAX_BLOG_ALL_SECTION_NAME))
return queryset.filter(all_sections | Q(section=section_idx))
def _get_next_or_previous_by_FIELD(self, field, is_next, **kwargs):
if not self.pk:
raise ValueError("get_next/get_previous cannot be used on unsaved objects.")
op = 'gt' if is_next else 'lt'
order = '' if is_next else '-'
param = force_text(getattr(self, field.attname))
q = Q(**{'%s__%s' % (field.name, op): param})
q = q | Q(**{field.name: param, 'pk__%s' % op: self.pk})
qs = self.__class__._default_manager.using(self._state.db).filter(**kwargs).filter(q).order_by(
'%s%s' % (order, field.name), '%spk' % order
)
try:
return qs[0]
except IndexError:
raise self.DoesNotExist("%s matching query does not exist." % self.__class__._meta.object_name)
def _get_next_or_previous_by_FIELD(self, field, is_next, **kwargs):
if not self.pk:
raise ValueError("get_next/get_previous cannot be used on unsaved objects.")
op = 'gt' if is_next else 'lt'
order = '' if is_next else '-'
param = force_text(getattr(self, field.attname))
q = Q(**{'%s__%s' % (field.name, op): param})
q = q | Q(**{field.name: param, 'pk__%s' % op: self.pk})
qs = self.__class__._default_manager.using(self._state.db).filter(**kwargs).filter(q).order_by(
'%s%s' % (order, field.name), '%spk' % order
)
try:
return qs[0]
except IndexError:
raise self.DoesNotExist("%s matching query does not exist." % self.__class__._meta.object_name)
def _get_next_or_previous_by_FIELD(self, field, is_next, **kwargs):
if not self.pk:
raise ValueError("get_next/get_previous cannot be used on unsaved objects.")
op = 'gt' if is_next else 'lt'
order = '' if is_next else '-'
param = force_text(getattr(self, field.attname))
q = Q(**{'%s__%s' % (field.name, op): param})
q = q | Q(**{field.name: param, 'pk__%s' % op: self.pk})
qs = self.__class__._default_manager.using(self._state.db).filter(**kwargs).filter(q).order_by(
'%s%s' % (order, field.name), '%spk' % order
)
try:
return qs[0]
except IndexError:
raise self.DoesNotExist("%s matching query does not exist." % self.__class__._meta.object_name)
def _process_aggregate_args(self, args, kwargs):
"""for aggregate and annotate kwargs: allow ModelX___field syntax for kwargs, forbid it for args.
Modifies kwargs if needed (these are Aggregate objects, we translate the lookup member variable)"""
def patch_lookup_lt_18(a):
a.lookup = translate_polymorphic_field_path(self.model, a.lookup)
def patch_lookup_gte_18(a):
# With Django > 1.8, the field on which the aggregate operates is
# stored inside a complex query expression.
if isinstance(a, Q):
translate_polymorphic_Q_object(self.model, a)
elif hasattr(a, 'get_source_expressions'):
for source_expression in a.get_source_expressions():
patch_lookup_gte_18(source_expression)
else:
a.name = translate_polymorphic_field_path(self.model, a.name)
___lookup_assert_msg = 'PolymorphicModel: annotate()/aggregate(): ___ model lookup supported for keyword arguments only'
def test___lookup_for_args_lt_18(a):
assert '___' not in a.lookup, ___lookup_assert_msg
def test___lookup_for_args_gte_18(a):
""" *args might be complex expressions too in django 1.8 so
the testing for a '___' is rather complex on this one """
if isinstance(a, Q):
def tree_node_test___lookup(my_model, node):
" process all children of this Q node "
for i in range(len(node.children)):
child = node.children[i]
if type(child) == tuple:
# this Q object child is a tuple => a kwarg like Q( instance_of=ModelB )
assert '___' not in child[0], ___lookup_assert_msg
else:
# this Q object child is another Q object, recursively process this as well
tree_node_test___lookup(my_model, child)
tree_node_test___lookup(self.model, a)
elif hasattr(a, 'get_source_expressions'):
for source_expression in a.get_source_expressions():
test___lookup_for_args_gte_18(source_expression)
else:
assert '___' not in a.name, ___lookup_assert_msg
for a in args:
test___lookup = test___lookup_for_args_lt_18 if django.VERSION < (1, 8) else test___lookup_for_args_gte_18
test___lookup(a)
for a in six.itervalues(kwargs):
patch_lookup = patch_lookup_lt_18 if django.VERSION < (1, 8) else patch_lookup_gte_18
patch_lookup(a)