def get_usable_luns(cls, queryset):
"""
Get all Luns which are not used by Targets and have enough VolumeNode configuration
to be used as a Target (i.e. have only one node or at least have a primary node set)
Luns are usable if they have only one VolumeNode (i.e. no HA available but
we can definitively say where it should be mounted) or if they have
a primary VolumeNode (i.e. one or more VolumeNodes is available and we
know at least where the primary mount should be)
"""
queryset = cls.get_unused_luns(queryset)\
.filter(volumenode__host__not_deleted=True)\
.annotate(has_primary=BoolOr('volumenode__primary'), num_volumenodes=Count('volumenode'))\
.filter(Q(num_volumenodes=1) | Q(has_primary=True))
return queryset
python类Count()的实例源码
def get_stats(self):
return self.values('status').annotate(amount=Count('id')).with_status_name()
def get_count(self, using):
"""
Performs a COUNT() query using the current filter constraints.
"""
obj = self.clone()
obj.add_annotation(Count('*'), alias='__count', is_summary=True)
number = obj.get_aggregation(using, ['__count'])['__count']
if number is None:
number = 0
return number
def get_random_song_list(self, n):
# ?????n?QueryObject??,??QuerySet
# ???????order_by('?')
result = []
count = self.aggregate(count=Count('id'))['count']
for _ in range(n):
try:
random_index = randint(0, count - 1)
except ValueError:
raise MusicLibrary.DoesNotExist
result.append(self.all()[random_index])
return result
def fix_results_value_count():
results = Result.objects.annotate(number_of_values=Count('timeseriesresult__values'))
for result in results:
result.value_count = result.number_of_values
result.save()
def random(self, user):
conjugations = self
if not user.is_anonymous():
mood_tense = MoodTense.objects.filter(users=user)
if mood_tense.count():
conjugations = conjugations.filter(mood_tense__in=MoodTense.objects.filter(users=user))
count = conjugations.aggregate(count=Count('id'))['count']
random_index = random.randint(0, count - 1)
return conjugations.order_by('id')[random_index]
def filter_popular(self, queryset, value):
"""
Recommend content that is popular with all users.
:param queryset: all content nodes for this channel
:param value: id of currently logged in user, or none if user is anonymous
:return: 10 most popular content nodes
"""
if ContentSessionLog.objects.count() < 50:
# return 25 random content nodes if not enough session logs
pks = queryset.values_list('pk', flat=True).exclude(kind=content_kinds.TOPIC)
# .count scales with table size, so can get slow on larger channels
count_cache_key = 'content_count_for_popular'
count = cache.get(count_cache_key) or min(pks.count(), 25)
return queryset.filter(pk__in=sample(list(pks), count))
cache_key = 'popular_content'
if cache.get(cache_key):
return cache.get(cache_key)
# get the most accessed content nodes
content_counts_sorted = ContentSessionLog.objects \
.values_list('content_id', flat=True) \
.annotate(Count('content_id')) \
.order_by('-content_id__count')
most_popular = queryset.filter(content_id__in=list(content_counts_sorted[:10]))
# cache the popular results queryset for 10 minutes, for efficiency
cache.set(cache_key, most_popular, 60 * 10)
return most_popular
def data(self):
return self.get_query().annotate(
minutes=Sum('minutes'),
users=Count('user', distinct=True)
)
def get_queryset(self, request):
return (
super().get_queryset(request)
.annotate(participants_count=Count('participants', distinct=True))
.annotate(messages_count=Count('messages', distinct=True))
)
def get_queryset(self, request):
return super().get_queryset(request).annotate(applications_count=Count('instances'))
def get_queryset(self, request):
return super().get_queryset(request).annotate(terms_count=Count('terms'))
def _get_unrecoverable_nodes(self, user):
"""
Filter Workspace where the user is the only member
:param user:
:return: QuerySet
"""
member_cls = get_model('accounts', 'Member')
return get_model('nodes', 'Node').objects.filter(
pk__in=member_cls.objects.filter(user=user).values_list(
'node_id', flat=True)
).annotate(is_recoverable=Count('membership')).exclude(
is_recoverable__gt=1)
def random(self):
count = self.aggregate(count=Count('id'))['count']
random_index = randint(0, count - 1)
return self.all()[random_index]
def random(self):
count = self.aggregate(count=Count('id'))['count']
random_index = randint(0, count - 1)
return self.all()[random_index]
def get_count(self, using):
"""
Performs a COUNT() query using the current filter constraints.
"""
obj = self.clone()
obj.add_annotation(Count('*'), alias='__count', is_summary=True)
number = obj.get_aggregation(using, ['__count'])['__count']
if number is None:
number = 0
return number
def top_hourly(self):
now = timezone.now()
time_begin = now - datetime.timedelta(hours=1)
cursor = self.filter(log_time__gte=time_begin).values('keyword').annotate(pv=Count('ip', distinct=True)).order_by('-pv')
return cursor
def top_daily(self):
now = timezone.now()
time_begin = now - datetime.timedelta(days=1)
cursor = self.filter(log_time__gte=time_begin).values('keyword').annotate(pv=Count('ip', distinct=True)).order_by('-pv')
return cursor
def top_hourly(self):
now = timezone.now()
time_begin = now - datetime.timedelta(hours=1)
cursor = self.filter(log_time__gte=time_begin).values('hash_id').annotate(pv=Count('ip', distinct=True)).order_by('-pv')
return cursor
def top_daily(self):
now = timezone.now()
time_begin = now - datetime.timedelta(days=1)
cursor = self.filter(log_time__gte=time_begin).values('hash_id').annotate(pv=Count('ip', distinct=True)).order_by('-pv')
return cursor
def get_count(self, using):
"""
Performs a COUNT() query using the current filter constraints.
"""
obj = self.clone()
obj.add_annotation(Count('*'), alias='__count', is_summary=True)
number = obj.get_aggregation(using, ['__count'])['__count']
if number is None:
number = 0
return number
def get_count(self, using):
"""
Performs a COUNT() query using the current filter constraints.
"""
obj = self.clone()
obj.add_annotation(Count('*'), alias='__count', is_summary=True)
number = obj.get_aggregation(using, ['__count'])['__count']
if number is None:
number = 0
return number
def get_count(self, using):
"""
Performs a COUNT() query using the current filter constraints.
"""
obj = self.clone()
obj.add_annotation(Count('*'), alias='__count', is_summary=True)
number = obj.get_aggregation(using, ['__count'])['__count']
if number is None:
number = 0
return number
def get_count(self, using):
"""
Performs a COUNT() query using the current filter constraints.
"""
obj = self.clone()
obj.add_annotation(Count('*'), alias='__count', is_summary=True)
number = obj.get_aggregation(using, ['__count'])['__count']
if number is None:
number = 0
return number
def get_count(self, using):
"""
Performs a COUNT() query using the current filter constraints.
"""
obj = self.clone()
obj.add_annotation(Count('*'), alias='__count', is_summary=True)
number = obj.get_aggregation(using, ['__count'])['__count']
if number is None:
number = 0
return number
def data(self):
return self.get_query(product=self.product).annotate(complete=Max('complete'))
#def wiki_registrations():
# return (
# MediaWikiUser.objects
# .annotate(year=Substr('registration', 1, 4))
# .order_by('year')
# .values('year')
# .annotate(
# users=Count('id', distinct=True)
# )
# )
#
#
#def wiki_revisions():
# return (
# Revision.objects
# .annotate(year=Substr('rev_timestamp', 1, 4))
# .order_by('year')
# .values('year')
# .annotate(
# revisions=Count('id')
# )
# )
#
#
#def wiki_registrations_year(year):
# return (
# MediaWikiUser.objects
# .annotate(
# year=Substr('registration', 1, 4),
# month=Substr('registration', 5, 2)
# )
# .filter(year=str(year).encode())
# .order_by('month')
# .values('month')
# .annotate(
# users=Count('id', distinct=True)
# )
# )
#
#
#def wiki_revisions_year(year):
# return (
# Revision.objects
# .annotate(
# year=Substr('rev_timestamp', 1, 4),
# month=Substr('rev_timestamp', 5, 2)
# )
# .filter(year=str(year).encode())
# .order_by('month')
# .values('month')
# .annotate(
# revisions=Count('id')
# )
# )