def get_crashcnts_by_build(classes=None):
q = db.session.query(Build.build, db.func.count(Record.id)).join(Record).join(Classification)
if not classes:
classes = ['org.clearlinux/crash/clr']
q = q.filter(Classification.classification.in_(classes))
q = q.filter(Build.build.op('~')('^[0-9]+$'))
q = q.group_by(Build.build)
q = q.order_by(desc(cast(Build.build, db.Integer)))
q = q.limit(10)
return q.all()
python类desc()的实例源码
def get_machine_ids_for_guilty(id, most_recent=None):
q = db.session.query(Build.build, Record.machine_id, db.func.count(Record.id).label('total'), Record.guilty_id)
q = q.join(Record)
q = q.filter(Record.guilty_id == id)
q = q.filter(Record.os_name == 'clear-linux-os')
q = q.filter(Build.build.op('~')('^[0-9][0-9]+$'))
q = q.group_by(Build.build, Record.machine_id, Record.guilty_id)
q = q.order_by(desc(cast(Build.build, db.Integer)), desc('total'))
if most_recent:
interval_sec = 24 * 60 * 60 * int(most_recent)
current_time = time()
sec_in_past = current_time - interval_sec
q = q.filter(Record.tsp > sec_in_past)
return q.all()
def __query__(self):
if self.order == self.DESCENDANT:
query = desc(self.attribute)
elif self.order == self.ASCENDANT:
query = asc(self.attribute)
return query
def test_activation_mixin(self):
activated_student = Student()
activated_student.name = 'activated-student'
activated_student.activated_at = datetime.now()
DBSession.add(activated_student)
deactivated_student = Student()
deactivated_student.name = 'deactivated-student'
deactivated_student.activated_at = None
DBSession.add(deactivated_student)
DBSession.commit()
# Test ordering:
student_list = Student.query.order_by(desc(Student.is_active)).all()
self.assertIsNotNone(student_list[0].activated_at)
self.assertIsNone(student_list[-1].activated_at)
student_list = Student.query.order_by(asc(Student.is_active)).all()
self.assertIsNotNone(student_list[-1].activated_at)
self.assertIsNone(student_list[0].activated_at)
# Test filtering:
student_list = Student.query.filter(Student.is_active).all()
for student in student_list:
self.assertIsNotNone(student.activated_at)
student_list = Student.query.filter(not_(Student.is_active)).all()
for student in student_list:
self.assertIsNone(student.activated_at)
def actions_get(context, instance_uuid):
"""Get all instance actions for the provided uuid."""
actions = model_query(context, models.InstanceAction).\
filter_by(instance_uuid=instance_uuid).\
order_by(desc("created_at"), desc("id")).\
all()
return actions
def _action_get_last_created_by_instance_uuid(context, instance_uuid):
result = (model_query(context, models.InstanceAction).
filter_by(instance_uuid=instance_uuid).
order_by(desc("created_at"), desc("id")).
first())
return result
def action_events_get(context, action_id):
events = model_query(context, models.InstanceActionEvent).\
filter_by(action_id=action_id).\
order_by(desc("created_at"), desc("id")).\
all()
return events
def recent_update(self, days):
current = datetime.now()
# from one week ago
start_time = current - timedelta(days=days)
session = SessionManager.Session()
try:
result = session.query(Episode, Bangumi).\
join(Bangumi).\
filter(Episode.delete_mark == None).\
filter(Episode.status == Episode.STATUS_DOWNLOADED).\
filter(Episode.update_time >= start_time).\
filter(Episode.update_time <= current).\
order_by(desc(Episode.update_time))
episode_list = []
for eps, bgm in result:
episode = row2dict(eps)
episode['thumbnail'] = utils.generate_thumbnail_link(eps, bgm)
episode['bangumi'] = row2dict(bgm)
episode['bangumi']['cover'] = utils.generate_cover_link(bgm)
episode_list.append(episode)
return json_resp({'data': episode_list})
finally:
SessionManager.Session.remove()
def list_episode(self, page, count, sort_field, sort_order, status):
try:
session = SessionManager.Session()
query_object = session.query(Episode).\
filter(Episode.delete_mark == None)
if status is not None:
query_object = query_object.filter(Episode.status==status)
# count total rows
total = session.query(func.count(Episode.id)).filter(Episode.status==status).scalar()
else:
total = session.query(func.count(Episode.id)).scalar()
offset = (page - 1) * count
if sort_order == 'desc':
episode_list = query_object.\
order_by(desc(getattr(Episode, sort_field))).\
offset(offset).\
limit(count).\
all()
else:
episode_list = query_object.\
order_by(asc(getattr(Episode, sort_field))).\
offset(offset).limit(count).\
all()
episode_dict_list = [row2dict(episode) for episode in episode_list]
return json_resp({'data': episode_dict_list, 'total': total})
finally:
SessionManager.Session.remove()
def _build_total_expressions(self, queryset, totals):
mapper = inspect(self.objects_class)
primary_keys = mapper.primary_key
relationships = {
'aliases': {},
'join_chains': [],
'prefix': 'totals_',
}
aggregates = []
group_cols = OrderedDict()
group_by = []
group_limit = None
for total in totals:
for aggregate, columns in total.items():
if aggregate == self.AGGR_GROUPLIMIT:
if not isinstance(columns, int):
raise HTTPBadRequest('Invalid attribute', 'Group limit option requires an integer value')
group_limit = columns
continue
if not columns:
if aggregate == self.AGGR_GROUPBY:
raise HTTPBadRequest('Invalid attribute', 'Group by option requires at least one column name')
if len(primary_keys) > 1:
aggregates.append(Function(aggregate, func.row(*primary_keys)).label(aggregate))
else:
aggregates.append(Function(aggregate, *primary_keys).label(aggregate))
continue
if not isinstance(columns, list):
columns = [columns]
for column in columns:
expression = self._parse_tokens(self.objects_class, column.split('__'), None, relationships,
lambda c, n, v: n)
if expression is not None:
if aggregate == self.AGGR_GROUPBY:
group_cols[column] = expression.label(column)
group_by.append(expression)
else:
aggregates.append(Function(aggregate, expression).label(aggregate))
agg_query = self._apply_joins(queryset, relationships, distinct=False)
group_cols_expr = list(group_cols.values())
columns = group_cols_expr + aggregates
if group_limit:
row_order = list(map(lambda c: c.desc(), aggregates))
columns.append(func.row_number().over(partition_by=group_cols_expr[:-1],
order_by=row_order).label('row_number'))
order = ','.join(list(map(str, range(1, len(group_cols_expr) + 1)))
+ list(map(lambda c: str(c) + ' DESC', range(1 + len(group_cols_expr),
len(aggregates) + len(group_cols_expr) + 1))))
agg_query = agg_query.statement.with_only_columns(columns).order_by(None).order_by(order)
if group_by:
agg_query = agg_query.group_by(*group_by)
if group_limit:
subquery = agg_query.alias()
agg_query = select([subquery]).where(subquery.c.row_number <= group_limit)
return agg_query, list(group_cols.keys())
def list_bangumi(self, page, count, sort_field, sort_order, name, user_id, bangumi_type):
try:
session = SessionManager.Session()
query_object = session.query(Bangumi).\
options(joinedload(Bangumi.cover_image)).\
filter(Bangumi.delete_mark == None)
if bangumi_type != -1:
query_object = query_object.filter(Bangumi.type == bangumi_type)
if name is not None:
name_pattern = '%{0}%'.format(name.encode('utf-8'),)
logger.debug(name_pattern)
query_object = query_object.\
filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern)))
# count total rows
total = session.query(func.count(Bangumi.id)).\
filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern))).\
scalar()
else:
total = session.query(func.count(Bangumi.id)).scalar()
if sort_order == 'desc':
query_object = query_object.\
order_by(desc(getattr(Bangumi, sort_field)))
else:
query_object = query_object.\
order_by(asc(getattr(Bangumi, sort_field)))
if count == -1:
bangumi_list = query_object.all()
else:
offset = (page - 1) * count
bangumi_list = query_object.offset(offset).limit(count).all()
bangumi_id_list = [bgm.id for bgm in bangumi_list]
favorites = session.query(Favorites).\
filter(Favorites.bangumi_id.in_(bangumi_id_list)).\
filter(Favorites.user_id == user_id).\
all()
bangumi_dict_list = []
for bgm in bangumi_list:
bangumi = row2dict(bgm)
bangumi['cover'] = utils.generate_cover_link(bgm)
utils.process_bangumi_dict(bgm, bangumi)
for fav in favorites:
if fav.bangumi_id == bgm.id:
bangumi['favorite_status'] = fav.status
bangumi_dict_list.append(bangumi)
return json_resp({'data': bangumi_dict_list, 'total': total})
finally:
SessionManager.Session.remove()
def list_bangumi(self, page, count, sort_field, sort_order, name, bangumi_type):
try:
session = SessionManager.Session()
query_object = session.query(Bangumi).\
options(joinedload(Bangumi.cover_image)).\
options(joinedload(Bangumi.created_by)).\
options(joinedload(Bangumi.maintained_by)).\
filter(Bangumi.delete_mark == None)
if bangumi_type != -1:
query_object = query_object.filter(Bangumi.type == bangumi_type)
if name is not None:
name_pattern = '%{0}%'.format(name.encode('utf-8'),)
logger.debug(name_pattern)
query_object = query_object.\
filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern)))
# count total rows
total = session.query(func.count(Bangumi.id)).\
filter(or_(Bangumi.name.ilike(name_pattern), Bangumi.name_cn.ilike(name_pattern))).\
scalar()
else:
total = session.query(func.count(Bangumi.id)).scalar()
if sort_order == 'desc':
query_object = query_object.\
order_by(desc(getattr(Bangumi, sort_field)))
else:
query_object = query_object.\
order_by(asc(getattr(Bangumi, sort_field)))
# we now support query all method by passing count = -1
if count == -1:
bangumi_list = query_object.all()
else:
offset = (page - 1) * count
bangumi_list = query_object.offset(offset).limit(count).all()
bangumi_dict_list = []
for bgm in bangumi_list:
bangumi = row2dict(bgm)
bangumi['cover'] = utils.generate_cover_link(bgm)
utils.process_bangumi_dict(bgm, bangumi)
self.__process_user_obj_in_bangumi(bgm, bangumi)
bangumi_dict_list.append(bangumi)
return json_resp({'data': bangumi_dict_list, 'total': total})
# raise ClientError('something happened')
finally:
SessionManager.Session.remove()