def as_sql(self, bare_lookup, fallback=True):
'''
Compose the sql lookup to get the value for this virtual field in a query.
'''
language = self.get_language()
if language == DEFAULT_LANGUAGE:
return self._localized_lookup(language, bare_lookup)
if not fallback:
i18n_lookup = self._localized_lookup(language, bare_lookup)
return Cast(i18n_lookup, self.output_field())
fallback_chain = get_fallback_chain(language)
# first, add the current language to the list of lookups
lookups = [self._localized_lookup(language, bare_lookup)]
# and now, add the list of fallback languages to the lookup list
for fallback_language in fallback_chain:
lookups.append(
self._localized_lookup(fallback_language, bare_lookup)
)
return Coalesce(*lookups, output_field=self.output_field())
python类Coalesce()的实例源码
def for_user(self, user, start, end):
"""Get employments in given time frame for current user.
This includes overlapping employments.
:param User user: The user of the searched employments
:param datetime.date start: start of time frame
:param datetime.date end: end of time frame
:returns: queryset of employments
"""
# end date NULL on database is like employment is ending today
queryset = self.annotate(
end=functions.Coalesce('end_date', models.Value(date.today()))
)
return queryset.filter(
user=user
).exclude(
models.Q(end__lt=start) | models.Q(start_date__gt=end)
)
def count_votes(self):
"""
Returns questions annotated with the number of votes they have.
"""
return self.annotate(num_votes=Coalesce(models.Sum('votes__vote'), 0))
def count_votes(self):
"""
Returns questions annotated with the number of votes they have.
"""
return self.annotate(num_votes=Coalesce(models.Sum('votes__vote'), 0))
##########################################################################
## Answer Manager
##########################################################################
def output_field(self):
'''
The type of field used to Cast/Coalesce to.
Mainly because a max_length argument is required for CharField
until this PR is merged: https://github.com/django/django/pull/8758
'''
Field = self.original_field.__class__
if isinstance(self.original_field, fields.CharField):
return Field(max_length=self.original_field.max_length)
return Field()
def filter_date(self, queryset, name, value):
queryset = queryset.annotate(
end=Coalesce('end_date', Value(date.today()))
)
queryset = queryset.filter(
start_date__lte=value,
end__gte=value
)
return queryset
def validate(self, data):
"""Validate the employment as a whole.
Ensure the end date is after the start date and there is only one
active employment per user and there are no overlapping employments.
:throws: django.core.exceptions.ValidationError
:return: validated data
:rtype: dict
"""
instance = self.instance
start_date = data.get('start_date', instance and instance.start_date)
end_date = data.get('end_date', instance and instance.end_date)
if end_date and start_date >= end_date:
raise ValidationError(_(
'The end date must be after the start date'
))
user = data.get('user', instance and instance.user)
employments = models.Employment.objects.filter(user=user)
# end date not set means employment is ending today
end_date = end_date or date.today()
employments = employments.annotate(
end=Coalesce('end_date', Value(date.today()))
)
if instance:
employments = employments.exclude(id=instance.id)
if any([
e.start_date <= end_date and start_date <= e.end
for e in employments
]):
raise ValidationError(_(
'A user can\'t have multiple employments at the same time'
))
return data
def get_queryset(self):
qs = super().get_queryset()
qs = qs.annotate(score=Coalesce(Avg('vote__vote'), 0))
return qs
def query_sum(queryset, field):
return queryset.aggregate(s=Coalesce(Sum(field), 0))['s']
dashboard_forms.py 文件源码
项目:CommunityCellularManager
作者: facebookincubator
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def __init__(self, *args, **kwargs):
super(SelectTowerForm, self).__init__(*args, **kwargs)
# We create the choice field here in the init so that if the network
# values change, the form will pick up the changes and not require the
# server to be restarted.
choices = []
# We create a convoluted tower queryset so that towers that have never
# synced (last_active = None) sort after active and inactive towers.
the_past = datetime.datetime.now() - datetime.timedelta(days=10*365)
all_towers = models.BTS.objects.all().annotate(
new_last_active=Coalesce('last_active', Value(the_past))).order_by(
'-new_last_active')
for tower in all_towers:
value = tower.id
user_profile = models.UserProfile.objects.get(
network=tower.network)
abbreviated_uuid = tower.uuid[0:5]
if tower.nickname:
prefix = 'Tower "%s" - %s..' % (
tower.nickname, abbreviated_uuid)
else:
prefix = 'Tower %s..' % abbreviated_uuid
display = '%s (%s)' % (prefix, user_profile.user.email)
choices.append((value, display))
self.fields['tower'] = forms.ChoiceField(
label="Tower", choices=choices, required=False)
# Set layout attributes.
self.helper = FormHelper()
self.helper.form_id = 'select-tower-form'
self.helper.form_method = 'post'
self.helper.form_action = '/dashboard/staff/tower-monitoring'
self.helper.add_input(Submit('submit', 'Select'))
self.helper.layout = Layout('tower')
def get(self, request):
""""Handles GET requests."""
user_profile = models.UserProfile.objects.get(user=request.user)
if not user_profile.user.is_staff:
return response.Response('', status=status.HTTP_404_NOT_FOUND)
# We create a convoluted queryset so that towers that have never synced
# (last_active = None) sort after active and inactive towers.
the_past = datetime.datetime.now() - datetime.timedelta(days=10*365)
towers = models.BTS.objects.all().annotate(
new_last_active=Coalesce('last_active', Value(the_past))).order_by(
'-new_last_active')
# Attach UserProfiles to each tower in the queryset.
for tower in towers:
tower_user_profiles = models.UserProfile.objects.filter(
network=tower.network)
for tower_user_profile in tower_user_profiles:
if hasattr(tower, 'user_email'):
tower.user_email += ',' + tower_user_profile.user.email
else:
tower.user_email = tower_user_profile.user.email
# Configure the table of towers.
tower_table = django_tables.StaffTowerTable(list(towers))
towers_per_page = 8
paginate = False
if towers.count() > towers_per_page:
paginate = {'per_page': towers_per_page}
tables.RequestConfig(request, paginate=paginate).configure(
tower_table)
context = {
'networks': get_objects_for_user(request.user, 'view_network', klass=models.Network),
'user_profile': user_profile,
'towers': towers,
'tower_table': tower_table,
}
# Render the template.
towers_template = template.loader.get_template(
'dashboard/staff/towers.html')
html = towers_template.render(context, request)
return http.HttpResponse(html)
def calculate_products_cost(self):
""" ??????? ????????? ??????? """
return self.records.aggregate(
cost=Coalesce(models.Sum(
models.F('order_price') * models.F('count'), output_field=ValuteField()
), 0)
)['cost']
def get_rating():
return {
'count': RatingVote.objects.count(),
'avg': RatingVote.objects.aggregate(rating=Coalesce(models.Avg('rating'), 0))['rating']
}
def save(self, *args, **kwargs):
is_add = not self.pk
if is_add:
self.self_type = ContentType.objects.get_for_model(type(self), for_concrete_model=False)
self.created = now()
self.sort_order = self.gallery.items.aggregate(
max=Coalesce(models.Max('sort_order'), 0) + 1
)['max']
super().save(*args, **kwargs)
def get_queryset(self, request):
return (
super().get_queryset(request)
.annotate(minutes=Coalesce(Sum('entries__minutes'), 0))
.annotate(last_log=Max('entries__updated'))
)
def get_total_price(cls, year, month):
total_price = cls.objects.filter(created_at__year=year, created_at__month=month).aggregate(total=Coalesce(Sum('price'), 0))
return total_price['total']