def _get_queryset(klass):
"""
Returns a QuerySet from a Model, Manager, or QuerySet. Created to make
get_object_or_404 and get_list_or_404 more DRY.
Raises a ValueError if klass is not a Model, Manager, or QuerySet.
"""
if isinstance(klass, QuerySet):
return klass
elif isinstance(klass, Manager):
manager = klass
elif isinstance(klass, ModelBase):
manager = klass._default_manager
else:
if isinstance(klass, type):
klass__name = klass.__name__
else:
klass__name = klass.__class__.__name__
raise ValueError("Object is of type '%s', but must be a Django Model, "
"Manager, or QuerySet" % klass__name)
return manager.all()
python类QuerySet()的实例源码
def get_object_or_404(klass, *args, **kwargs):
"""
Uses get() to return an object, or raises a Http404 exception if the object
does not exist.
klass may be a Model, Manager, or QuerySet object. All other passed
arguments and keyword arguments are used in the get() query.
Note: Like with get(), an MultipleObjectsReturned will be raised if more than one
object is found.
"""
queryset = _get_queryset(klass)
try:
return queryset.get(*args, **kwargs)
except queryset.model.DoesNotExist:
raise Http404('No %s matches the given query.' % queryset.model._meta.object_name)
def process_rhs(self, compiler, connection):
value = self.rhs
if self.bilateral_transforms:
if self.rhs_is_direct_value():
# Do not call get_db_prep_lookup here as the value will be
# transformed before being used for lookup
value = Value(value, output_field=self.lhs.output_field)
value = self.apply_bilateral_transforms(value)
value = value.resolve_expression(compiler.query)
# Due to historical reasons there are a couple of different
# ways to produce sql here. get_compiler is likely a Query
# instance, _as_sql QuerySet and as_sql just something with
# as_sql. Finally the value can of course be just plain
# Python value.
if hasattr(value, 'get_compiler'):
value = value.get_compiler(connection=connection)
if hasattr(value, 'as_sql'):
sql, params = compiler.compile(value)
return '(' + sql + ')', params
if hasattr(value, '_as_sql'):
sql, params = value._as_sql(connection=connection)
return '(' + sql + ')', params
else:
return self.get_db_prep_lookup(value, connection)
test_querysetsequence.py 文件源码
项目:django-querysetsequence
作者: percipient
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_queryset_multiple(self):
"""
When using multiple paramters to filter they get ANDed together. Ensure
this works when filtering by QuerySet.
"""
qss = self._get_qss().filter(**{'#__gt': 0, 'title__gt': 'Django Rocks'})
data = [it.title for it in qss]
expected = [
# Some of the Articles and the BlogPosts.
'Some Article',
'Post',
]
self.assertEqual(data, expected)
# This would only look at Articles and BlogPosts, but neither of those
# have a title > "Some Article."
qss = self._get_qss().filter(**{'#__gt': 0, 'title__gt': 'Some Article'})
# Only the articles are here because it's the second queryset.
data = [it.title for it in qss]
self.assertEqual(data, [])
def _get_queryset(klass):
"""
Returns a QuerySet from a Model, Manager, or QuerySet. Created to make
get_object_or_404 and get_list_or_404 more DRY.
Raises a ValueError if klass is not a Model, Manager, or QuerySet.
"""
if isinstance(klass, QuerySet):
return klass
elif isinstance(klass, Manager):
manager = klass
elif isinstance(klass, ModelBase):
manager = klass._default_manager
else:
if isinstance(klass, type):
klass__name = klass.__name__
else:
klass__name = klass.__class__.__name__
raise ValueError("Object is of type '%s', but must be a Django Model, "
"Manager, or QuerySet" % klass__name)
return manager.all()
def get_object_or_404(klass, *args, **kwargs):
"""
Uses get() to return an object, or raises a Http404 exception if the object
does not exist.
klass may be a Model, Manager, or QuerySet object. All other passed
arguments and keyword arguments are used in the get() query.
Note: Like with get(), an MultipleObjectsReturned will be raised if more than one
object is found.
"""
queryset = _get_queryset(klass)
try:
return queryset.get(*args, **kwargs)
except queryset.model.DoesNotExist:
raise Http404('No %s matches the given query.' % queryset.model._meta.object_name)
danceschool_tags.py 文件源码
项目:django-danceschool
作者: django-danceschool
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def get_item_by_key(passed_list, key, value):
'''
This one allows us to get one or more items from a list of
dictionaries based on the value of a specified key, where
both the key and the value can be variable names. Does
not work with None or null string passed values.
'''
if value in [None,'']:
return
if type(passed_list) in [QuerySet, PolymorphicQuerySet]:
sub_list = passed_list.filter(**{key: value})
else:
sub_list = [x for x in passed_list if x.get(key) == value]
if len(sub_list) == 1:
return sub_list[0]
return sub_list
def _get_queryset(klass):
"""
Returns a QuerySet from a Model, Manager, or QuerySet. Created to make
get_object_or_404 and get_list_or_404 more DRY.
Raises a ValueError if klass is not a Model, Manager, or QuerySet.
"""
if isinstance(klass, QuerySet):
return klass
elif isinstance(klass, Manager):
manager = klass
elif isinstance(klass, ModelBase):
manager = klass._default_manager
else:
if isinstance(klass, type):
klass__name = klass.__name__
else:
klass__name = klass.__class__.__name__
raise ValueError("Object is of type '%s', but must be a Django Model, "
"Manager, or QuerySet" % klass__name)
return manager.all()
def get_object_or_404(klass, *args, **kwargs):
"""
Uses get() to return an object, or raises a Http404 exception if the object
does not exist.
klass may be a Model, Manager, or QuerySet object. All other passed
arguments and keyword arguments are used in the get() query.
Note: Like with get(), an MultipleObjectsReturned will be raised if more than one
object is found.
"""
queryset = _get_queryset(klass)
try:
return queryset.get(*args, **kwargs)
except queryset.model.DoesNotExist:
raise Http404('No %s matches the given query.' % queryset.model._meta.object_name)
def _get_queryset(klass):
"""
Returns a QuerySet from a Model, Manager, or QuerySet. Created to make
get_object_or_404 and get_list_or_404 more DRY.
Raises a ValueError if klass is not a Model, Manager, or QuerySet.
"""
if isinstance(klass, QuerySet):
return klass
elif isinstance(klass, Manager):
manager = klass
elif isinstance(klass, ModelBase):
manager = klass._default_manager
else:
if isinstance(klass, type):
klass__name = klass.__name__
else:
klass__name = klass.__class__.__name__
raise ValueError("Object is of type '%s', but must be a Django Model, "
"Manager, or QuerySet" % klass__name)
return manager.all()
def get_object_or_404(klass, *args, **kwargs):
"""
Uses get() to return an object, or raises a Http404 exception if the object
does not exist.
klass may be a Model, Manager, or QuerySet object. All other passed
arguments and keyword arguments are used in the get() query.
Note: Like with get(), an MultipleObjectsReturned will be raised if more than one
object is found.
"""
queryset = _get_queryset(klass)
try:
return queryset.get(*args, **kwargs)
except queryset.model.DoesNotExist:
raise Http404('No %s matches the given query.' % queryset.model._meta.object_name)
def resolve_connection(cls, connection, default_manager, args, iterable):
if iterable is None:
iterable = default_manager
iterable = maybe_queryset(iterable)
if isinstance(iterable, QuerySet):
if iterable is not default_manager:
default_queryset = maybe_queryset(default_manager)
iterable = cls.merge_querysets(default_queryset, iterable)
_len = iterable.count()
else:
_len = len(iterable)
connection = connection_from_list_slice(
iterable,
args,
slice_start=0,
list_length=_len,
list_slice_length=_len,
connection_type=connection,
edge_type=connection.Edge,
pageinfo_type=PageInfo,
)
connection.iterable = iterable
connection.length = _len
return connection
def process_rhs(self, compiler, connection):
value = self.rhs
if self.bilateral_transforms:
if self.rhs_is_direct_value():
# Do not call get_db_prep_lookup here as the value will be
# transformed before being used for lookup
value = Value(value, output_field=self.lhs.output_field)
value = self.apply_bilateral_transforms(value)
value = value.resolve_expression(compiler.query)
# Due to historical reasons there are a couple of different
# ways to produce sql here. get_compiler is likely a Query
# instance, _as_sql QuerySet and as_sql just something with
# as_sql. Finally the value can of course be just plain
# Python value.
if hasattr(value, 'get_compiler'):
value = value.get_compiler(connection=connection)
if hasattr(value, 'as_sql'):
sql, params = compiler.compile(value)
return '(' + sql + ')', params
if hasattr(value, '_as_sql'):
sql, params = value._as_sql(connection=connection)
return '(' + sql + ')', params
else:
return self.get_db_prep_lookup(value, connection)
def get_searched_queryset(self, qs):
model = self.model
term = self.GET["term"]
try:
term = model.autocomplete_term_adjust(term)
except AttributeError:
pass
search_fields = get_autocomplete_search_fields(self.model)
if search_fields:
for word in term.split():
search = [models.Q(**{smart_text(item): smart_text(word)}) for item in search_fields]
search_qs = QuerySet(model)
search_qs.query.select_related = qs.query.select_related
search_qs = search_qs.filter(reduce(operator.or_, search))
qs &= search_qs
else:
qs = model.objects.none()
return qs
def add_fail_count_to_tasklist(tasklist):
tid_list=[i.tid for i in tasklist]
buff={}
for i in tasklist:
buff[i.tid]=i
if len(tid_list)==0:
pass
return True
for i in tasklist:
i.count=0
if len(tid_list)==1:
tid_list*=2
tid_tuple=tuple(tid_list)
# _query= Runlog.objects.filter(tid__in=tid_list).query
#_query.group_by=["tid"]
#_buff=QuerySet(query=_query,model=Runlog)
#_result=_buff.annotate(count=Count("rid"))
cursor=connection.cursor()
cursor.execute("SELECT `task_runlog`.`rid`, `task_runlog`.`tid`,COUNT(`task_runlog`.`rid`) AS `count` FROM `task_runlog` WHERE `task_runlog`.`tid` IN %s GROUP BY tid;",(tid_tuple,))
_result=cursor.fetchall()
for _rid,_tid,_count in _result:
buff[_tid].count=int(_count)
return True
def _get_queryset(klass):
"""
Returns a QuerySet from a Model, Manager, or QuerySet. Created to make
get_object_or_404 and get_list_or_404 more DRY.
Raises a ValueError if klass is not a Model, Manager, or QuerySet.
"""
if isinstance(klass, QuerySet):
return klass
elif isinstance(klass, Manager):
manager = klass
elif isinstance(klass, ModelBase):
manager = klass._default_manager
else:
if isinstance(klass, type):
klass__name = klass.__name__
else:
klass__name = klass.__class__.__name__
raise ValueError("Object is of type '%s', but must be a Django Model, "
"Manager, or QuerySet" % klass__name)
return manager.all()
def get_object_or_404(klass, *args, **kwargs):
"""
Uses get() to return an object, or raises a Http404 exception if the object
does not exist.
klass may be a Model, Manager, or QuerySet object. All other passed
arguments and keyword arguments are used in the get() query.
Note: Like with get(), an MultipleObjectsReturned will be raised if more than one
object is found.
"""
queryset = _get_queryset(klass)
try:
return queryset.get(*args, **kwargs)
except queryset.model.DoesNotExist:
raise Http404('No %s matches the given query.' % queryset.model._meta.object_name)
def process_rhs(self, compiler, connection):
value = self.rhs
if self.bilateral_transforms:
if self.rhs_is_direct_value():
# Do not call get_db_prep_lookup here as the value will be
# transformed before being used for lookup
value = Value(value, output_field=self.lhs.output_field)
value = self.apply_bilateral_transforms(value)
value = value.resolve_expression(compiler.query)
# Due to historical reasons there are a couple of different
# ways to produce sql here. get_compiler is likely a Query
# instance, _as_sql QuerySet and as_sql just something with
# as_sql. Finally the value can of course be just plain
# Python value.
if hasattr(value, 'get_compiler'):
value = value.get_compiler(connection=connection)
if hasattr(value, 'as_sql'):
sql, params = compiler.compile(value)
return '(' + sql + ')', params
if hasattr(value, '_as_sql'):
sql, params = value._as_sql(connection=connection)
return '(' + sql + ')', params
else:
return self.get_db_prep_lookup(value, connection)
def content(self, value):
workbook = None
if not bool(value) or not len(value): # Short-circuit to protect against empty querysets/empty lists/None, etc
self._container = []
return
elif isinstance(value, list):
workbook = self._serialize_list(value)
elif isinstance(value, QuerySet):
workbook = self._serialize_queryset(value)
if django.VERSION < (1, 9):
if isinstance(value, ValuesQuerySet):
workbook = self._serialize_values_queryset(value)
if workbook is None:
raise ValueError('ExcelResponse accepts the following data types: list, dict, QuerySet, ValuesQuerySet')
if self.force_csv:
self['Content-Type'] = 'text/csv; charset=utf8'
self['Content-Disposition'] = 'attachment;filename={}.csv'.format(self.output_filename)
workbook.seek(0)
workbook = self.make_bytes(workbook.getvalue())
else:
self['Content-Type'] = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
self['Content-Disposition'] = 'attachment; filename={}.xlsx'.format(self.output_filename)
workbook = save_virtual_workbook(workbook)
self._container = [self.make_bytes(workbook)]
def serialize_objects(django_objects,class_models,*args):
'''
models ???
:param django_objects:
:param args:
:param class_models:name of models class
:return:json
'''
serialize_list = []
if isinstance(django_objects,QuerySet):
for django_object in django_objects:
data_dic = {}
for column in args:
data_dic[column]= str(eval('django_object.%s'%column))
serialize_list.append(data_dic)
if isinstance(django_objects,class_models):
data_dic = {}
for column in args:
data_dic[column]= str(eval('django_objects.%s'%column))
serialize_list.append(data_dic)
return serialize_list
def _get_queryset(klass):
"""
Returns a QuerySet from a Model, Manager, or QuerySet. Created to make
get_object_or_404 and get_list_or_404 more DRY.
Raises a ValueError if klass is not a Model, Manager, or QuerySet.
"""
if isinstance(klass, QuerySet):
return klass
elif isinstance(klass, Manager):
manager = klass
elif isinstance(klass, ModelBase):
manager = klass._default_manager
else:
klass__name = klass.__name__ if isinstance(klass, type) \
else klass.__class__.__name__
raise ValueError("Object is of type '%s', but must be a Django Model, "
"Manager, or QuerySet" % klass__name)
return manager.all()
def get_object_or_404(klass, *args, **kwargs):
"""
Uses get() to return an object, or raises a Http404 exception if the object
does not exist.
klass may be a Model, Manager, or QuerySet object. All other passed
arguments and keyword arguments are used in the get() query.
Note: Like with get(), an MultipleObjectsReturned will be raised if more than one
object is found.
"""
queryset = _get_queryset(klass)
try:
return queryset.get(*args, **kwargs)
except queryset.model.DoesNotExist:
raise Http404('No %s matches the given query.' % queryset.model._meta.object_name)
def get_object_or_none(klass, *args, **kwargs):
"""
Uses get() to return an object, or raises a Http404 exception if the object
does not exist.
klass may be a Model, Manager, or QuerySet object. All other passed
arguments and keyword arguments are used in the get() query.
Note: Like with get(), an MultipleObjectsReturned will be raised if more than one
object is found.
"""
queryset = _get_queryset(klass)
try:
return queryset.get(*args, **kwargs)
except queryset.model.DoesNotExist:
return None
def queue(users, label, extra_context=None, on_site=True, sender=None):
"""
Queue the notification in NoticeQueueBatch. This allows for large amounts
of user notifications to be deferred to a seperate process running outside
the webserver.
"""
if extra_context is None:
extra_context = {}
if isinstance(users, QuerySet):
users = [row["pk"] for row in users.values("pk")]
else:
users = [user.pk for user in users]
notices = []
for user in users:
notices.append((user, label, extra_context, on_site, sender))
NoticeQueueBatch(pickled_data=pickle.dumps(notices).encode("base64")).save()
def _get_queryset(klass):
"""
Returns a QuerySet from a Model, Manager, or QuerySet. Created to make
get_object_or_404 and get_list_or_404 more DRY.
Raises a ValueError if klass is not a Model, Manager, or QuerySet.
"""
if isinstance(klass, QuerySet):
return klass
elif isinstance(klass, Manager):
manager = klass
elif isinstance(klass, ModelBase):
manager = klass._default_manager
else:
if isinstance(klass, type):
klass__name = klass.__name__
else:
klass__name = klass.__class__.__name__
raise ValueError("Object is of type '%s', but must be a Django Model, "
"Manager, or QuerySet" % klass__name)
return manager.all()
def get_object_or_404(klass, *args, **kwargs):
"""
Uses get() to return an object, or raises a Http404 exception if the object
does not exist.
klass may be a Model, Manager, or QuerySet object. All other passed
arguments and keyword arguments are used in the get() query.
Note: Like with get(), an MultipleObjectsReturned will be raised if more than one
object is found.
"""
queryset = _get_queryset(klass)
try:
return queryset.get(*args, **kwargs)
except queryset.model.DoesNotExist:
raise Http404('No %s matches the given query.' % queryset.model._meta.object_name)
def process_rhs(self, compiler, connection):
value = self.rhs
if self.bilateral_transforms:
if self.rhs_is_direct_value():
# Do not call get_db_prep_lookup here as the value will be
# transformed before being used for lookup
value = Value(value, output_field=self.lhs.output_field)
value = self.apply_bilateral_transforms(value)
value = value.resolve_expression(compiler.query)
# Due to historical reasons there are a couple of different
# ways to produce sql here. get_compiler is likely a Query
# instance, _as_sql QuerySet and as_sql just something with
# as_sql. Finally the value can of course be just plain
# Python value.
if hasattr(value, 'get_compiler'):
value = value.get_compiler(connection=connection)
if hasattr(value, 'as_sql'):
sql, params = compiler.compile(value)
return '(' + sql + ')', params
if hasattr(value, '_as_sql'):
sql, params = value._as_sql(connection=connection)
return '(' + sql + ')', params
else:
return self.get_db_prep_lookup(value, connection)
def filter_by_ids(self, query, ids=None):
"""Filter `query` result set by system_id values.
:param query: A QuerySet of Nodes.
:type query: django.db.models.query.QuerySet_
:param ids: Optional set of ids to filter by. If given, nodes whose
system_ids are not in `ids` will be ignored.
:type param_ids: Sequence
:return: A filtered version of `query`.
.. _django.db.models.query.QuerySet: https://docs.djangoproject.com/
en/dev/ref/models/querysets/
"""
if ids is None:
return query
else:
return query.filter(system_id__in=ids)
def get_bmc_accessible_nodes(self):
"""Return `QuerySet` of nodes that this rack controller can access.
This looks at the IP address assigned to all BMC's and filters out
only the BMC's this rack controller can access. Returning all nodes
connected to those BMCs.
"""
subnet_ids = set()
for interface in self.interface_set.all().prefetch_related(
"ip_addresses"):
for ip_address in interface.ip_addresses.all():
if ip_address.ip and ip_address.subnet_id is not None:
subnet_ids.add(ip_address.subnet_id)
nodes = Node.objects.filter(
bmc__ip_address__ip__isnull=False,
bmc__ip_address__subnet_id__in=subnet_ids).distinct()
return nodes
def test_creates_handler_with_options(self):
handler = make_handler(
"TestHandler", abstract=True, allowed_methods=["list"],
handler_name="testing", queryset=Node.objects.all(),
pk="system_id", fields=["hostname", "distro_series"],
exclude=["system_id"], list_fields=["hostname"],
list_exclude=["hostname"], non_changeable=["system_id"],
form=sentinel.form)
self.assertThat(handler._meta, MatchesStructure(
abstract=Is(True), allowed_methods=Equals(["list"]),
handler_name=Equals("testing"), object_class=Is(Node),
queryset=IsInstance(QuerySet), pk=Equals("system_id"),
fields=Equals(["hostname", "distro_series"]),
exclude=Equals(["system_id"]), list_fields=Equals(["hostname"]),
list_exclude=Equals(["hostname"]),
non_changeable=Equals(["system_id"]),
form=Is(sentinel.form)))