python类desc()的实例源码

model.py 文件源码 项目:telemetrics-backend 作者: clearlinux 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_crashcnts_by_build(classes=None):
        q = db.session.query(Build.build, db.func.count(Record.id)).join(Record).join(Classification)
        if not classes:
            classes = ['org.clearlinux/crash/clr']
        q = q.filter(Classification.classification.in_(classes))
        q = q.filter(Build.build.op('~')('^[0-9]+$'))
        q = q.group_by(Build.build)
        q = q.order_by(desc(cast(Build.build, db.Integer)))
        q = q.limit(10)
        return q.all()
model.py 文件源码 项目:telemetrics-backend 作者: clearlinux 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_machine_ids_for_guilty(id, most_recent=None):
        q = db.session.query(Build.build, Record.machine_id, db.func.count(Record.id).label('total'), Record.guilty_id)
        q = q.join(Record)
        q = q.filter(Record.guilty_id == id)
        q = q.filter(Record.os_name == 'clear-linux-os')
        q = q.filter(Build.build.op('~')('^[0-9][0-9]+$'))
        q = q.group_by(Build.build, Record.machine_id, Record.guilty_id)
        q = q.order_by(desc(cast(Build.build, db.Integer)), desc('total'))
        if most_recent:
            interval_sec = 24 * 60 * 60 * int(most_recent)
            current_time = time()
            sec_in_past = current_time - interval_sec
            q = q.filter(Record.tsp > sec_in_past)
        return q.all()
condition.py 文件源码 项目:dodotable 作者: spoqa 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __query__(self):
        if self.order == self.DESCENDANT:
            query = desc(self.attribute)
        elif self.order == self.ASCENDANT:
            query = asc(self.attribute)
        return query
test_mixins.py 文件源码 项目:restfulpy 作者: Carrene 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_activation_mixin(self):
        activated_student = Student()
        activated_student.name = 'activated-student'
        activated_student.activated_at = datetime.now()
        DBSession.add(activated_student)

        deactivated_student = Student()
        deactivated_student.name = 'deactivated-student'
        deactivated_student.activated_at = None
        DBSession.add(deactivated_student)

        DBSession.commit()

        # Test ordering:
        student_list = Student.query.order_by(desc(Student.is_active)).all()
        self.assertIsNotNone(student_list[0].activated_at)
        self.assertIsNone(student_list[-1].activated_at)

        student_list = Student.query.order_by(asc(Student.is_active)).all()
        self.assertIsNotNone(student_list[-1].activated_at)
        self.assertIsNone(student_list[0].activated_at)

        # Test filtering:
        student_list = Student.query.filter(Student.is_active).all()
        for student in student_list:
            self.assertIsNotNone(student.activated_at)

        student_list = Student.query.filter(not_(Student.is_active)).all()
        for student in student_list:
            self.assertIsNone(student.activated_at)
api.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def actions_get(context, instance_uuid):
    """Get all instance actions for the provided uuid."""
    actions = model_query(context, models.InstanceAction).\
                          filter_by(instance_uuid=instance_uuid).\
                          order_by(desc("created_at"), desc("id")).\
                          all()
    return actions
api.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _action_get_last_created_by_instance_uuid(context, instance_uuid):
    result = (model_query(context, models.InstanceAction).
                     filter_by(instance_uuid=instance_uuid).
                     order_by(desc("created_at"), desc("id")).
                     first())
    return result
api.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def action_events_get(context, action_id):
    events = model_query(context, models.InstanceActionEvent).\
                         filter_by(action_id=action_id).\
                         order_by(desc("created_at"), desc("id")).\
                         all()

    return events
bangumi.py 文件源码 项目:Albireo 作者: lordfriend 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def recent_update(self, days):

        current = datetime.now()

        # from one week ago
        start_time = current - timedelta(days=days)

        session = SessionManager.Session()
        try:
            result = session.query(Episode, Bangumi).\
                join(Bangumi).\
                filter(Episode.delete_mark == None).\
                filter(Episode.status == Episode.STATUS_DOWNLOADED).\
                filter(Episode.update_time >= start_time).\
                filter(Episode.update_time <= current).\
                order_by(desc(Episode.update_time))

            episode_list = []

            for eps, bgm in result:
                episode = row2dict(eps)
                episode['thumbnail'] = utils.generate_thumbnail_link(eps, bgm)
                episode['bangumi'] = row2dict(bgm)
                episode['bangumi']['cover'] = utils.generate_cover_link(bgm)
                episode_list.append(episode)

            return json_resp({'data': episode_list})
        finally:
            SessionManager.Session.remove()
admin.py 文件源码 项目:Albireo 作者: lordfriend 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def list_episode(self, page, count, sort_field, sort_order, status):
        try:

            session = SessionManager.Session()
            query_object = session.query(Episode).\
                filter(Episode.delete_mark == None)

            if status is not None:
                query_object = query_object.filter(Episode.status==status)
                # count total rows
                total = session.query(func.count(Episode.id)).filter(Episode.status==status).scalar()
            else:
                total = session.query(func.count(Episode.id)).scalar()

            offset = (page - 1) * count

            if sort_order == 'desc':
                episode_list = query_object.\
                    order_by(desc(getattr(Episode, sort_field))).\
                    offset(offset).\
                    limit(count).\
                    all()
            else:
                episode_list = query_object.\
                    order_by(asc(getattr(Episode, sort_field))).\
                    offset(offset).limit(count).\
                    all()

            episode_dict_list = [row2dict(episode) for episode in episode_list]

            return json_resp({'data': episode_dict_list, 'total': total})
        finally:
            SessionManager.Session.remove()
sqlalchemy.py 文件源码 项目:falcon-api 作者: Opentopic 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _build_total_expressions(self, queryset, totals):
        mapper = inspect(self.objects_class)
        primary_keys = mapper.primary_key
        relationships = {
            'aliases': {},
            'join_chains': [],
            'prefix': 'totals_',
        }
        aggregates = []
        group_cols = OrderedDict()
        group_by = []
        group_limit = None
        for total in totals:
            for aggregate, columns in total.items():
                if aggregate == self.AGGR_GROUPLIMIT:
                    if not isinstance(columns, int):
                        raise HTTPBadRequest('Invalid attribute', 'Group limit option requires an integer value')
                    group_limit = columns
                    continue
                if not columns:
                    if aggregate == self.AGGR_GROUPBY:
                        raise HTTPBadRequest('Invalid attribute', 'Group by option requires at least one column name')
                    if len(primary_keys) > 1:
                        aggregates.append(Function(aggregate, func.row(*primary_keys)).label(aggregate))
                    else:
                        aggregates.append(Function(aggregate, *primary_keys).label(aggregate))
                    continue
                if not isinstance(columns, list):
                    columns = [columns]
                for column in columns:
                    expression = self._parse_tokens(self.objects_class, column.split('__'), None, relationships,
                                                    lambda c, n, v: n)
                    if expression is not None:
                        if aggregate == self.AGGR_GROUPBY:
                            group_cols[column] = expression.label(column)
                            group_by.append(expression)
                        else:
                            aggregates.append(Function(aggregate, expression).label(aggregate))
        agg_query = self._apply_joins(queryset, relationships, distinct=False)
        group_cols_expr = list(group_cols.values())
        columns = group_cols_expr + aggregates
        if group_limit:
            row_order = list(map(lambda c: c.desc(), aggregates))
            columns.append(func.row_number().over(partition_by=group_cols_expr[:-1],
                                                  order_by=row_order).label('row_number'))
        order = ','.join(list(map(str, range(1, len(group_cols_expr) + 1)))
                         + list(map(lambda c: str(c) + ' DESC', range(1 + len(group_cols_expr),
                                                                      len(aggregates) + len(group_cols_expr) + 1))))
        agg_query = agg_query.statement.with_only_columns(columns).order_by(None).order_by(order)
        if group_by:
            agg_query = agg_query.group_by(*group_by)
        if group_limit:
            subquery = agg_query.alias()
            agg_query = select([subquery]).where(subquery.c.row_number <= group_limit)
        return agg_query, list(group_cols.keys())
bangumi.py 文件源码 项目:Albireo 作者: lordfriend 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def list_bangumi(self, page, count, sort_field, sort_order, name, user_id, bangumi_type):
        try:

            session = SessionManager.Session()
            query_object = session.query(Bangumi).\
                options(joinedload(Bangumi.cover_image)).\
                filter(Bangumi.delete_mark == None)

            if bangumi_type != -1:
                query_object = query_object.filter(Bangumi.type == bangumi_type)

            if name is not None:
                name_pattern = '%{0}%'.format(name.encode('utf-8'),)
                logger.debug(name_pattern)
                query_object = query_object.\
                    filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern)))
                # count total rows
                total = session.query(func.count(Bangumi.id)).\
                    filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern))).\
                    scalar()
            else:
                total = session.query(func.count(Bangumi.id)).scalar()

            if sort_order == 'desc':
                query_object = query_object.\
                    order_by(desc(getattr(Bangumi, sort_field)))

            else:
                query_object = query_object.\
                    order_by(asc(getattr(Bangumi, sort_field)))

            if count == -1:
                bangumi_list = query_object.all()
            else:
                offset = (page - 1) * count
                bangumi_list = query_object.offset(offset).limit(count).all()

            bangumi_id_list = [bgm.id for bgm in bangumi_list]

            favorites = session.query(Favorites).\
                filter(Favorites.bangumi_id.in_(bangumi_id_list)).\
                filter(Favorites.user_id == user_id).\
                all()

            bangumi_dict_list = []
            for bgm in bangumi_list:
                bangumi = row2dict(bgm)
                bangumi['cover'] = utils.generate_cover_link(bgm)
                utils.process_bangumi_dict(bgm, bangumi)
                for fav in favorites:
                    if fav.bangumi_id == bgm.id:
                        bangumi['favorite_status'] = fav.status
                bangumi_dict_list.append(bangumi)

            return json_resp({'data': bangumi_dict_list, 'total': total})
        finally:
            SessionManager.Session.remove()
admin.py 文件源码 项目:Albireo 作者: lordfriend 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def list_bangumi(self, page, count, sort_field, sort_order, name, bangumi_type):
        try:
            session = SessionManager.Session()
            query_object = session.query(Bangumi).\
                options(joinedload(Bangumi.cover_image)).\
                options(joinedload(Bangumi.created_by)).\
                options(joinedload(Bangumi.maintained_by)).\
                filter(Bangumi.delete_mark == None)

            if bangumi_type != -1:
                query_object = query_object.filter(Bangumi.type == bangumi_type)

            if name is not None:
                name_pattern = '%{0}%'.format(name.encode('utf-8'),)
                logger.debug(name_pattern)
                query_object = query_object.\
                    filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern)))
                # count total rows
                total = session.query(func.count(Bangumi.id)).\
                    filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern))).\
                    scalar()
            else:
                total = session.query(func.count(Bangumi.id)).scalar()

            if sort_order == 'desc':
                query_object = query_object.\
                    order_by(desc(getattr(Bangumi, sort_field)))
            else:
                query_object = query_object.\
                    order_by(asc(getattr(Bangumi, sort_field)))

            # we now support query all method by passing count = -1
            if count == -1:
                bangumi_list = query_object.all()
            else:
                offset = (page - 1) * count
                bangumi_list = query_object.offset(offset).limit(count).all()

            bangumi_dict_list = []
            for bgm in bangumi_list:
                bangumi = row2dict(bgm)
                bangumi['cover'] = utils.generate_cover_link(bgm)
                utils.process_bangumi_dict(bgm, bangumi)
                self.__process_user_obj_in_bangumi(bgm, bangumi)
                bangumi_dict_list.append(bangumi)

            return json_resp({'data': bangumi_dict_list, 'total': total})
            # raise ClientError('something happened')
        finally:
            SessionManager.Session.remove()


问题


面经


文章

微信
公众号

扫码关注公众号