def save(self, *args, **kwargs):
if self.parent and self.share_of:
raise ValueError("Can't be both a reply and a share!")
self.cache_data()
if self.parent:
self.content_type = ContentType.REPLY
# Ensure replies have sane values
self.visibility = self.parent.visibility
self.pinned = False
elif self.share_of:
self.content_type = ContentType.SHARE
if not self.pk:
if not self.guid:
self.guid = uuid4()
if self.pinned:
max_order = Content.objects.top_level().filter(author=self.author).aggregate(Max("order"))["order__max"]
if max_order is not None: # If max_order is None, there is likely to be no content yet
self.order = max_order + 1
self.fix_local_uploads()
super().save(*args, **kwargs)
self.cache_related_object_data()
python类Max()的实例源码
def search_shops_on_forum(force=False):
# Get member pages
step = 500
last_page = page_number = (Member.objects.aggregate(Max('page_number')) and not force) or 1
page_url = 'http://www.prestashop.com/forums/members/page__sort_key__members_display_name__sort_order__asc__max_results__%d__st__%d' % (step, (last_page-1)*step)
while page_url:
page = document_fromstring(urllib2.urlopen(page_url).read())
for member in page.cssselect('ul.members li h3.bar a:first'):
# member url
Member.objects.get_or_create(link=member.get('href'), defaults={'page_number':page_number})
page_url = page.cssselect('ul.pagination.left li.next a').get('href')
page_number+=1
for member in Member.objects.filter(page_number__gte=last_page):
member_page = document_fromstring(urllib2.urlopen(member.link).read())
for link in member_page.cssselect('div.general_box div.signature a'):
ShopLink.objects.get_or_create(link=link.get('href'), member=member)
def search_shops_on_rus_forum(force=False):
last_page = (MemberRus.objects.aggregate(Max('page_number')) and not force) or 1
for i in range(last_page, 4219):
page_url = 'http://prestadev.ru/forum/profile.php?u='+str(i)
page = document_fromstring(urllib2.urlopen(page_url).read())
messages = 0
try:
messages = int(page.cssselect('div.wttborder td strong')[2].text.strip())
except:
pass
try:
params = {'title': page.cssselect('#profilename')[0].text.strip(),
'messages': messages,
'page_number': i,
'home_page':page.cssselect('div.wttborder td.row1')[4]}
except IndexError:
continue
member = MemberRus.objects.get_or_create(**params)[0]
for link in page.cssselect('div.wgborder td.row1 a'):
ShopLinkRus.objects.get_or_create(link=link.get('href'), member=member)
def range_statistics(start, end):
""" Returns the statistics (totals) for a target date. Its month will be used. """
return DayStatistics.objects.filter(day__gte=start, day__lt=end).aggregate(
total_cost=Sum('total_cost'),
electricity1=Sum('electricity1'),
electricity1_cost=Sum('electricity1_cost'),
electricity1_returned=Sum('electricity1_returned'),
electricity2=Sum('electricity2'),
electricity2_cost=Sum('electricity2_cost'),
electricity2_returned=Sum('electricity2_returned'),
electricity_merged=Sum(models.F('electricity1') + models.F('electricity2')),
electricity_cost_merged=Sum(models.F('electricity1_cost') + models.F('electricity2_cost')),
electricity_returned_merged=Sum(models.F('electricity1_returned') + models.F('electricity2_returned')),
gas=Sum('gas'),
gas_cost=Sum('gas_cost'),
temperature_min=Min('lowest_temperature'),
temperature_max=Max('highest_temperature'),
temperature_avg=Avg('average_temperature'),
)
def annotate_channel_queryset_with_latest_activity_at(queryset, user):
return queryset.annotate(
latest_activity_timestamp=Max('action_targets__timestamp'),
).annotate(
latest_activity_at=Case(
When(
latest_activity_timestamp__isnull=True,
then='created_at'
),
When(
latest_activity_timestamp__gt=F('created_at'),
then='latest_activity_timestamp'
),
default='created_at',
output_field=DateTimeField()
)
)
def max_pk():
"""Returns the sum of all the highest PKs for each submodel."""
return reduce(
lambda x, y: x + y,
[int(p.objects.aggregate(Max('pk')).values()[0] or 0)
for p in AbstractPage.__subclasses__()],
)
def get(self, request):
if request.user.is_authenticated():
histories = OnlineHistory.objects.filter(mac=request.user.username)\
.values('date').annotate(min_time=Min('time'), max_time=Max('time')).order_by("-date")
return render(request, "index.html", locals())
else:
verification_token = str(uuid.uuid4())
request.session["verification_token"] = verification_token
return render(request, "authentication.html", locals())
def get(self, request):
if request.user.has_perm("mobile_scanner.view_staffonlinehistory"):
histories = OnlineHistory.objects.filter()\
.values('date', 'user__last_name', 'user__first_name').annotate(min_time=Min('time'), max_time=Max('time')).order_by("-date")
return render(request, "staff.html", locals())
else:
msg = "?????"
return render(request, "msg.html", locals())
def get_queryset(self, request):
return (
super().get_queryset(request)
.annotate(minutes=Coalesce(Sum('entries__minutes'), 0))
.annotate(last_log=Max('entries__updated'))
)
def data(self):
return self.get_query(product=self.product).annotate(complete=Max('complete'))
#def wiki_registrations():
# return (
# MediaWikiUser.objects
# .annotate(year=Substr('registration', 1, 4))
# .order_by('year')
# .values('year')
# .annotate(
# users=Count('id', distinct=True)
# )
# )
#
#
#def wiki_revisions():
# return (
# Revision.objects
# .annotate(year=Substr('rev_timestamp', 1, 4))
# .order_by('year')
# .values('year')
# .annotate(
# revisions=Count('id')
# )
# )
#
#
#def wiki_registrations_year(year):
# return (
# MediaWikiUser.objects
# .annotate(
# year=Substr('registration', 1, 4),
# month=Substr('registration', 5, 2)
# )
# .filter(year=str(year).encode())
# .order_by('month')
# .values('month')
# .annotate(
# users=Count('id', distinct=True)
# )
# )
#
#
#def wiki_revisions_year(year):
# return (
# Revision.objects
# .annotate(
# year=Substr('rev_timestamp', 1, 4),
# month=Substr('rev_timestamp', 5, 2)
# )
# .filter(year=str(year).encode())
# .order_by('month')
# .values('month')
# .annotate(
# revisions=Count('id')
# )
# )
def handle(self, *args, **options):
"""
Send new message notifications
"""
# command to run: python manage.py tunga_send_customer_emails
min_date = datetime.datetime.utcnow() - relativedelta(minutes=1) # 5 minute window to read new messages
customer_channels = Channel.objects.filter(
type=CHANNEL_TYPE_SUPPORT,
created_by__isnull=True,
content_type=ContentType.objects.get_for_model(Inquirer)
).annotate(new_messages=Sum(
Case(
When(
~Q(action_targets__actor_content_type=F('content_type')) &
Q(action_targets__gt=F('last_read')) &
Q(action_targets__timestamp__lte=min_date) &
Q(action_targets__verb__in=[verbs.SEND, verbs.UPLOAD]),
then=1
),
default=0,
output_field=IntegerField()
)
), latest_message=Max('action_targets__id')).filter(new_messages__gt=0)
for channel in customer_channels:
customer = channel.content_object
if customer.email:
activities = Action.objects.filter(
channels=channel, id__gt=channel.last_read, verb__in=[verbs.SEND]
).order_by('id')
messages = [activity.action_object for activity in activities]
if messages:
to = [customer.email]
subject = "[Tunga Support] Help"
ctx = {
'customer': customer,
'count': channel.new_messages,
'messages': messages,
'channel_url': '%s/customer/help/%s/' % (TUNGA_URL, channel.id)
}
if send_mail(subject, 'tunga/email/unread_help_messages', to, ctx):
channel.last_read = channel.latest_message
channel.save()
def update_task_pm_updates(task):
task = clean_instance(task, Task)
target_task = task
if task.parent:
# for sub-tasks, create all pm updates on the project
target_task = task.parent
if target_task.closed or target_task.is_task or not target_task.approved or not target_task.pm:
# only request pm updates for project which are approved and not closed
return
if target_task.update_interval and target_task.update_interval_units:
periodic_start_date = ProgressEvent.objects.filter(
Q(task=target_task) | Q(task__parent=target_task), type=PROGRESS_EVENT_TYPE_PM
).aggregate(latest_date=Max('due_at'))['latest_date']
now = datetime.datetime.utcnow()
if periodic_start_date and periodic_start_date > now:
return
if not periodic_start_date:
periodic_start_date = datetime.datetime.utcnow()
if periodic_start_date:
last_update_at = clean_update_datetime(periodic_start_date, target_task)
while True:
last_update_day = last_update_at.weekday()
next_update_at = last_update_at
if last_update_day < 3:
# Last was before Thursday so schedule for Thursday
next_update_at += relativedelta(days=3 - last_update_day)
else:
# Last was on after Thursday so schedule for Monday
next_update_at += relativedelta(days=7 - last_update_day)
if next_update_at >= now:
future_by_18_hours = now + relativedelta(hours=18)
if next_update_at <= future_by_18_hours and (
not target_task.deadline or next_update_at < target_task.deadline):
num_updates_within_on_same_day = ProgressEvent.objects.filter(
task=target_task, type=PROGRESS_EVENT_TYPE_PM,
due_at__contains=next_update_at.date()
).count()
if num_updates_within_on_same_day == 0:
# Schedule at most one pm update for any day
ProgressEvent.objects.update_or_create(
task=target_task, type=PROGRESS_EVENT_TYPE_PM,
due_at=next_update_at, defaults={'title': 'PM Report'}
)
break
else:
last_update_at = next_update_at
def update_task_client_surveys(task):
task = clean_instance(task, Task)
target_task = task
if task.parent:
# for sub-tasks, create all surveys on the project
target_task = task.parent
if target_task.closed or not (
target_task.survey_client and target_task.approved and target_task.active_participants):
# only conduct survey for approved tasks that have been assigned devs and aren't closed
return
if target_task.update_interval and target_task.update_interval_units:
periodic_start_date = ProgressEvent.objects.filter(
Q(task=target_task) | Q(task__parent=target_task), type=PROGRESS_EVENT_TYPE_CLIENT
).aggregate(latest_date=Max('due_at'))['latest_date']
now = datetime.datetime.utcnow()
if periodic_start_date and periodic_start_date > now:
return
if not periodic_start_date:
periodic_start_date = datetime.datetime.utcnow()
if periodic_start_date:
last_update_at = clean_update_datetime(periodic_start_date, target_task)
while True:
last_update_day = last_update_at.weekday()
# Schedule next survey for Monday
next_update_at = last_update_at + relativedelta(days=7 - last_update_day)
if next_update_at >= now:
future_by_18_hours = now + relativedelta(hours=18)
if next_update_at <= future_by_18_hours and (
not target_task.deadline or next_update_at < target_task.deadline):
num_updates_on_same_day = ProgressEvent.objects.filter(
task=target_task, type=PROGRESS_EVENT_TYPE_CLIENT,
due_at__contains=next_update_at.date()
).count()
if num_updates_on_same_day == 0:
# Schedule at most one survey for any day
ProgressEvent.objects.update_or_create(
task=target_task, type=PROGRESS_EVENT_TYPE_CLIENT,
due_at=next_update_at, defaults={'title': 'Weekly Survey'}
)
break
else:
last_update_at = next_update_at