def annotate_running_scans_count(self) -> 'ScanListQuerySet':
return self.annotate(
running_scans__count=RawSQL('''
SELECT COUNT("{Scan}"."id")
FROM "{Scan}"
WHERE
"{Scan}"."end" IS NULL AND
"{Scan}"."site_id" IN
(SELECT "{Site_ScanLists}"."site_id"
FROM "{Site_ScanLists}"
WHERE "{Site_ScanLists}"."scanlist_id" = "{ScanList}"."id"
GROUP BY "{Site_ScanLists}"."site_id")
'''.format(
Scan=Scan._meta.db_table,
Site_ScanLists=Site.scan_lists.through._meta.db_table,
ScanList=ScanList._meta.db_table), ()))
python类RawSQL()的实例源码
def annotate_most_recent_scan_error_count(self) -> 'ScanListQuerySet':
return self.annotate(
last_scan__error_count=RawSQL('''
SELECT COUNT("id")
FROM "{ScanError}"
WHERE
"{ScanError}"."scan_id" = "{Site}"."last_scan_id"
'''.format(
Scan=Scan._meta.db_table,
Site=Site._meta.db_table,
ScanError=ScanError._meta.db_table), ()))
def products_changed_handler(sender, **kwargs):
"""
?????????? ??????? ????????? ???-?? ??????? ?????????,
??????????? ??????????????? ? ?????????.
???????????? ??? ?????????? ????????? ??????? ? ?????? ?????????.
"""
categories = kwargs.get('categories')
if isinstance(categories, ShopCategory):
# ????????? ?????????
categories = ShopCategory.objects.filter(pk=categories.pk)
elif isinstance(categories, (int, str)):
# ?????? ??? ?????, ?????????? ID ?????????
categories = ShopCategory.objects.filter(pk=categories)
elif isinstance(categories, (list, tuple, set, ValuesListQuerySet)):
# ?????? ????? ??? ?????, ?????????? ID ?????????
categories = ShopCategory.objects.filter(pk__in=categories)
elif isinstance(categories, QuerySet) and categories.model is ShopCategory:
# QuerySet ?????????
pass
else:
raise TypeError('Invalid categories for signal "products_changed"')
with transaction.atomic():
categories.update(
product_count=RawSQL(
'(SELECT COUNT(*) '
'FROM shop_shopproduct AS ssp '
'WHERE ssp.category_id = shop_shopcategory.id '
'AND ssp.is_visible = TRUE)',
()
)
)
categories.update(
total_product_count=F('product_count')
)
categories_changed.send(ShopCategory, categories=categories)
def priority_ordering(self, request, queryset):
kw_param = request.query_params.get('topics__id__in', '')
topics = tuple(int(kw) for kw in kw_param.split(',')
if kw.isdigit())
if topics:
sql = """
SELECT count(*) FROM (
SELECT topic_id FROM reqs_requirement_topics
WHERE topic_id IN %s
AND requirement_id = reqs_requirement.id
GROUP BY topic_id
) AS subq
"""
queryset = queryset.annotate(kw_count=RawSQL(sql, (topics,)))
queryset = queryset.order_by('-kw_count', 'req_id')
return queryset
def order_by_json_path(self, json_path, language_code=None, order='asc'):
"""
Orders a queryset by the value of the specified `json_path`.
More about the `#>>` operator and the `json_path` arg syntax:
https://www.postgresql.org/docs/current/static/functions-json.html
More about Raw SQL expressions:
https://docs.djangoproject.com/en/dev/ref/models/expressions/#raw-sql-expressions
Usage example:
MyModel.objects.language('en_us').filter(is_active=True).order_by_json_path('title')
"""
language_code = (language_code
or self._language_code
or self.get_language_key(language_code))
json_path = '{%s,%s}' % (language_code, json_path)
# Our jsonb field is named `translations`.
raw_sql_expression = RawSQL("translations#>>%s", (json_path,))
if order == 'desc':
raw_sql_expression = raw_sql_expression.desc()
return self.order_by(raw_sql_expression)
def annotate_most_recent_scan_start(self) -> 'SiteQuerySet':
return self.annotate(
last_scan__start=RawSQL('''
SELECT DISTINCT ON (site_id) "start"
FROM "{Scan}"
WHERE
site_id={Site}."id"
ORDER BY "site_id", "end" DESC NULLS FIRST
LIMIT 1
'''.format(
Scan=Scan._meta.db_table,
Site=Site._meta.db_table,
Site_ScanLists=Site.scan_lists.through._meta.db_table), ()))
def annotate_most_recent_scan_end_or_null(self) -> 'SiteQuerySet':
return self.annotate(
last_scan__end_or_null=RawSQL('''
SELECT DISTINCT ON (site_id) "end"
FROM "{Scan}"
WHERE
site_id={Site}."id"
ORDER BY "site_id", "end" DESC NULLS FIRST
LIMIT 1
'''.format(
Scan=Scan._meta.db_table,
Site=Site._meta.db_table,
Site_ScanLists=Site.scan_lists.through._meta.db_table), ()))
def annotate_most_recent_scan_result(self) -> 'SiteQuerSet':
return self.annotate(last_scan__result=RawSQL('''
SELECT "{ScanResult}"."result"
FROM "{ScanResult}"
WHERE
"{ScanResult}"."scan_id"="{Site}"."last_scan_id"
LIMIT 1
'''.format(
ScanResult=ScanResult._meta.db_table,
Site=Site._meta.db_table), ()))
def send_reminder_messages():
now = timezone.now()
tasks = (Task.objects.open()
.filter(reminder_message_sent_at=None,
reminder_message_timeout__isnull=False)
.annotate(deadline=RawSQL('created_at + reminder_message_timeout', ()))
.filter(deadline__lt=now))
for task in tasks:
send_task_message(task, _('{task} still open'), 'still_open.txt')
task.reminder_message_sent_at = now
task.save(update_fields=('reminder_message_sent_at',))
def get_queryset(self):
return (
Band.objects
.annotate(
user_joined=RawSQL(
'SELECT 1 FROM api_member '
'WHERE api_member.band_id = api_band.id AND api_member.user_id = %s '
'LIMIT 1',
(self.request.user.id,)
)
)
.annotate(compositions_count=Count('compositions', distinct=True))
.annotate(members_count=Count('members', distinct=True))
.select_related('leader__member__user')
)
def get_queryset(self):
queryset = super(LogEntryManager, self).get_queryset()
return queryset.filter(Q(object_schema_id=None) |
Q(object_schema_id=expressions.RawSQL('current_schema()', [])))
def get_queryset(self):
"""
By default, PostgreSQL will order INETs with shorter (larger) prefix lengths ahead of those with longer
(smaller) masks. This makes no sense when ordering IPs, which should be ordered solely by family and host
address. We can use HOST() to extract just the host portion of the address (ignoring its mask), but we must
then re-cast this value to INET() so that records will be ordered properly. We are essentially re-casting each
IP address as a /32 or /128.
"""
qs = super(IPAddressManager, self).get_queryset()
return qs.annotate(host=RawSQL('INET(HOST(ipam_ipaddress.address))', [])).order_by('family', 'host')
def categories_changed_handler(sender, **kwargs):
"""
?????????? ??????? ????????? ???-?? ??????? ?????????,
??????????? ? ????????? ? ?? ?????????????.
???????????? ??? ?????????? ????????? ??????? ? ?????? ?????????.
"""
categories = kwargs.get('categories')
include_self = kwargs.get('include_self', True)
if isinstance(categories, ShopCategory):
# ????????? ?????????
categories = ShopCategory.objects.filter(pk=categories.pk)
elif isinstance(categories, (int, str)):
# ?????? ??? ?????, ?????????? ID ?????????
categories = ShopCategory.objects.filter(pk=categories)
elif isinstance(categories, (list, tuple, set, ValuesListQuerySet)):
# ?????? ????? ??? ?????, ?????????? ID ?????????
categories = ShopCategory.objects.filter(pk__in=categories)
elif isinstance(categories, QuerySet) and categories.model is ShopCategory:
# QuerySet ?????????
pass
else:
raise TypeError('Invalid categories for signal "categories_changed"')
ancestors = categories.get_ancestors(
include_self=include_self
).filter(
is_visible=True
).order_by('tree_id', '-level').values_list('id', flat=True)
with transaction.atomic():
for category_id in ancestors:
ShopCategory.objects.filter(pk=category_id).update(
total_product_count=RawSQL(
'SELECT shop_shopcategory.product_count + '
'COALESCE(SUM(ssc.total_product_count), 0) '
'FROM shop_shopcategory AS ssc '
'WHERE ssc.parent_id = shop_shopcategory.id '
'AND ssc.is_visible = TRUE',
()
)
)
def leaderboard(request, challenge_phase_split_id):
"""Returns leaderboard for a corresponding Challenge Phase Split"""
# check if the challenge exists or not
try:
challenge_phase_split = ChallengePhaseSplit.objects.get(
pk=challenge_phase_split_id)
except ChallengePhaseSplit.DoesNotExist:
response_data = {'error': 'Challenge Phase Split does not exist'}
return Response(response_data, status=status.HTTP_400_BAD_REQUEST)
# Check if the Challenge Phase Split is publicly visible or not
if challenge_phase_split.visibility != ChallengePhaseSplit.PUBLIC:
response_data = {'error': 'Sorry, leaderboard is not public yet for this Challenge Phase Split!'}
return Response(response_data, status=status.HTTP_400_BAD_REQUEST)
# Get the leaderboard associated with the Challenge Phase Split
leaderboard = challenge_phase_split.leaderboard
# Get the default order by key to rank the entries on the leaderboard
try:
default_order_by = leaderboard.schema['default_order_by']
except:
response_data = {'error': 'Sorry, Default filtering key not found in leaderboard schema!'}
return Response(response_data, status=status.HTTP_400_BAD_REQUEST)
# Get all the successful submissions related to the challenge phase split
leaderboard_data = LeaderboardData.objects.filter(
challenge_phase_split=challenge_phase_split,
submission__is_public=True,
submission__is_flagged=False).order_by('created_at')
leaderboard_data = leaderboard_data.annotate(
filtering_score=RawSQL('result->>%s', (default_order_by, ), output_field=FloatField())).values(
'id', 'submission__participant_team__team_name',
'challenge_phase_split', 'result', 'filtering_score', 'leaderboard__schema')
sorted_leaderboard_data = sorted(leaderboard_data, key=lambda k: float(k['filtering_score']), reverse=True)
distinct_sorted_leaderboard_data = []
team_list = []
for data in sorted_leaderboard_data:
if data['submission__participant_team__team_name'] in team_list:
continue
else:
distinct_sorted_leaderboard_data.append(data)
team_list.append(data['submission__participant_team__team_name'])
leaderboard_labels = challenge_phase_split.leaderboard.schema['labels']
for item in distinct_sorted_leaderboard_data:
item['result'] = [item['result'][index.lower()] for index in leaderboard_labels]
paginator, result_page = paginated_queryset(
distinct_sorted_leaderboard_data,
request,
pagination_class=StandardResultSetPagination())
response_data = result_page
return paginator.get_paginated_response(response_data)
def from_search_query(self, search_query):
"""
Return queryset of objects from SearchQuery.results, **in order**.
EXPERIMENTAL: this will only work with results from a single index,
with a single doc_type - as we are returning a single QuerySet.
This method takes the hits JSON and converts that into a queryset
of all the relevant objects. The key part of this is the ordering -
the order in which search results are returned is based on relevance,
something that only ES can calculate, and that cannot be replicated
in the database.
It does this by adding custom SQL which annotates each record with
the score from the search 'hit'. This is brittle, caveat emptor.
The RawSQL clause is in the form:
SELECT CASE {{model}}.id WHEN {{id}} THEN {{score}} END
The "WHEN x THEN y" is repeated for every hit. The resulting SQL, in
full is like this:
SELECT "freelancer_freelancerprofile"."id",
(SELECT CASE freelancer_freelancerprofile.id
WHEN 25 THEN 1.0
WHEN 26 THEN 1.0
[...]
ELSE 0
END) AS "search_score"
FROM "freelancer_freelancerprofile"
WHERE "freelancer_freelancerprofile"."id" IN (25, 26, [...])
ORDER BY "search_score" DESC
It should be very fast, as there is no table lookup, but there is an
assumption at the heart of this, which is that the search query doesn't
contain the entire database - i.e. that it has been paged. (ES itself
caps the results at 10,000.)
"""
hits = search_query.hits
score_sql = self._raw_sql([(h['id'], h['score']) for h in hits])
rank_sql = self._raw_sql([(hits[i]['id'], i) for i in range(len(hits))])
return (
self.get_queryset()
.filter(pk__in=[h['id'] for h in hits])
# add the query relevance score
.annotate(search_score=RawSQL(score_sql, ()))
# add the ordering number (0-based)
.annotate(search_rank=RawSQL(rank_sql, ()))
.order_by('search_rank')
)
def order_naturally(self, method=IFACE_ORDERING_POSITION):
"""
Naturally order interfaces by their type and numeric position. The sort method must be one of the defined
IFACE_ORDERING_CHOICES (typically indicated by a parent Device's DeviceType).
To order interfaces naturally, the `name` field is split into six distinct components: leading text (type),
slot, subslot, position, channel, and virtual circuit:
{type}{slot}/{subslot}/{position}/{subposition}:{channel}.{vc}
Components absent from the interface name are ignored. For example, an interface named GigabitEthernet1/2/3
would be parsed as follows:
name = 'GigabitEthernet'
slot = 1
subslot = 2
position = 3
subposition = 0
channel = None
vc = 0
The original `name` field is taken as a whole to serve as a fallback in the event interfaces do not match any of
the prescribed fields.
"""
sql_col = '{}.name'.format(self.model._meta.db_table)
ordering = {
IFACE_ORDERING_POSITION: (
'_slot', '_subslot', '_position', '_subposition', '_channel', '_type', '_vc', '_id', 'name',
),
IFACE_ORDERING_NAME: (
'_type', '_slot', '_subslot', '_position', '_subposition', '_channel', '_vc', '_id', 'name',
),
}[method]
TYPE_RE = r"SUBSTRING({} FROM '^([^0-9]+)')"
ID_RE = r"CAST(SUBSTRING({} FROM '^(?:[^0-9]+)([0-9]+)$') AS integer)"
SLOT_RE = r"CAST(SUBSTRING({} FROM '^(?:[^0-9]+)?([0-9]+)\/') AS integer)"
SUBSLOT_RE = r"COALESCE(CAST(SUBSTRING({} FROM '^(?:[^0-9]+)?(?:[0-9]+\/)([0-9]+)') AS integer), 0)"
POSITION_RE = r"COALESCE(CAST(SUBSTRING({} FROM '^(?:[^0-9]+)?(?:[0-9]+\/){{2}}([0-9]+)') AS integer), 0)"
SUBPOSITION_RE = r"COALESCE(CAST(SUBSTRING({} FROM '^(?:[^0-9]+)?(?:[0-9]+\/){{3}}([0-9]+)') AS integer), 0)"
CHANNEL_RE = r"COALESCE(CAST(SUBSTRING({} FROM ':([0-9]+)(\.[0-9]+)?$') AS integer), 0)"
VC_RE = r"COALESCE(CAST(SUBSTRING({} FROM '\.([0-9]+)$') AS integer), 0)"
fields = {
'_type': RawSQL(TYPE_RE.format(sql_col), []),
'_id': RawSQL(ID_RE.format(sql_col), []),
'_slot': RawSQL(SLOT_RE.format(sql_col), []),
'_subslot': RawSQL(SUBSLOT_RE.format(sql_col), []),
'_position': RawSQL(POSITION_RE.format(sql_col), []),
'_subposition': RawSQL(SUBPOSITION_RE.format(sql_col), []),
'_channel': RawSQL(CHANNEL_RE.format(sql_col), []),
'_vc': RawSQL(VC_RE.format(sql_col), []),
}
return self.annotate(**fields).order_by(*ordering)