sqlalchemy.py 文件源码

python
阅读 44 收藏 0 点赞 0 评论 0

项目:falcon-api 作者: Opentopic 项目源码 文件源码
def _build_total_expressions(self, queryset, totals):
        mapper = inspect(self.objects_class)
        primary_keys = mapper.primary_key
        relationships = {
            'aliases': {},
            'join_chains': [],
            'prefix': 'totals_',
        }
        aggregates = []
        group_cols = OrderedDict()
        group_by = []
        group_limit = None
        for total in totals:
            for aggregate, columns in total.items():
                if aggregate == self.AGGR_GROUPLIMIT:
                    if not isinstance(columns, int):
                        raise HTTPBadRequest('Invalid attribute', 'Group limit option requires an integer value')
                    group_limit = columns
                    continue
                if not columns:
                    if aggregate == self.AGGR_GROUPBY:
                        raise HTTPBadRequest('Invalid attribute', 'Group by option requires at least one column name')
                    if len(primary_keys) > 1:
                        aggregates.append(Function(aggregate, func.row(*primary_keys)).label(aggregate))
                    else:
                        aggregates.append(Function(aggregate, *primary_keys).label(aggregate))
                    continue
                if not isinstance(columns, list):
                    columns = [columns]
                for column in columns:
                    expression = self._parse_tokens(self.objects_class, column.split('__'), None, relationships,
                                                    lambda c, n, v: n)
                    if expression is not None:
                        if aggregate == self.AGGR_GROUPBY:
                            group_cols[column] = expression.label(column)
                            group_by.append(expression)
                        else:
                            aggregates.append(Function(aggregate, expression).label(aggregate))
        agg_query = self._apply_joins(queryset, relationships, distinct=False)
        group_cols_expr = list(group_cols.values())
        columns = group_cols_expr + aggregates
        if group_limit:
            row_order = list(map(lambda c: c.desc(), aggregates))
            columns.append(func.row_number().over(partition_by=group_cols_expr[:-1],
                                                  order_by=row_order).label('row_number'))
        order = ','.join(list(map(str, range(1, len(group_cols_expr) + 1)))
                         + list(map(lambda c: str(c) + ' DESC', range(1 + len(group_cols_expr),
                                                                      len(aggregates) + len(group_cols_expr) + 1))))
        agg_query = agg_query.statement.with_only_columns(columns).order_by(None).order_by(order)
        if group_by:
            agg_query = agg_query.group_by(*group_by)
        if group_limit:
            subquery = agg_query.alias()
            agg_query = select([subquery]).where(subquery.c.row_number <= group_limit)
        return agg_query, list(group_cols.keys())
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号