def get_bikes_for_wetday(cls, dbsession, wetdate, station_id):
"""very similar to get_bikes_for_weekday but not the same: date specified is wetdate not weekday.
returns a list of bikes for a provided datetime object (wetdate) and station."""
# averaged per hour so 24 results.
station = [("Time", "Available Bikes", "Available Stands")]
station_data = dbsession.query(
func.hour(cls.last_update),
func.avg(cls.available_bikes),
func.avg(cls.available_bike_stands))\
.filter(cls.station_id == station_id,
func.date(cls.last_update) == wetdate.date())\
.group_by(func.hour(cls.last_update)).all()
# this section parses the query return into a readable list.
# from docs:extend() appends the contents of seq to list.
if station_data:
station.extend([(a, float(b), float(c)) for a, b, c in station_data])
else:
station.extend([(0,0,0)])
return station
python类date()的实例源码
def get_current_station_info(cls, dbsession):
"""as the method name suggests this returns the up to date station information."""
sub = dbsession.query(UsageData.station_id, func.max(UsageData.id).label('max_update')).group_by(
UsageData.station_id).subquery()
return dbsession.query(
UsageData.last_update,
UsageData.available_bike_stands, UsageData.available_bikes).join(sub, and_(
sub.c.max_update == UsageData.id)).all()
def weekly_checkins(date, user):
week = week_magic(date)
checkins = Checkin.query.filter(Checkin.user==user, func.date(Checkin.checkin_timestamp)>=week[0]).order_by('Checkin.checkin_timestamp')
user_week = [False, False, False, False, False]
for date in week:
for checkin in checkins:
if date == checkin.checkin_timestamp.date():
user_week[date.weekday()] = True
return user_week
def admin_weekly_checkins(date, grade=None):
week = week_magic(date)
if grade is None:
checkins = Checkin.query.filter(and_(func.date(Checkin.checkin_timestamp)>=week[0], func.date(Checkin.checkin_timestamp)<=week[4])).order_by('Checkin.checkin_timestamp')
else:
checkins = Checkin.query.filter(and_(Checkin.user.has(grade=grade), func.date(Checkin.checkin_timestamp)>=week[0], func.date(Checkin.checkin_timestamp)<=week[4])).order_by('Checkin.checkin_timestamp')
return checkins
def monthly_checkins(year_num, month_num, user):
month_range = calendar.monthrange(year_num, month_num)
month = [False] * month_range[1]
start_of_month = datetime.date(year_num, month_num, 1)
checkins = Checkin.query.filter(Checkin.user==user, Checkin.checkin_timestamp>=start_of_month).order_by('Checkin.checkin_timestamp')
for checkin in checkins:
if checkin.checkin_timestamp.date().month == month_num:
if checkin.checkin_timestamp.date().year == year_num:
month[checkin.checkin_timestamp.date().day - 1] = True
return month
def checkin_user(user):
today = datetime.date.today()
for checkin in user.checkins:
if checkin.checkin_timestamp.date() == today:
return False
checkinObject = Checkin()
user.checkins.append(checkinObject)
db.session.add(checkinObject)
user.checkedin = True
db.session.commit()
return True
def __init__(self, date): # creates a list of all checkins on each day of the week
self.monday_checkins = Checkin.query.filter(Checkin.checkin_week == str(date.isocalendar()[1]), Checkin.checkin_day == str(1) ).all()
self.tuesday_checkins = Checkin.query.filter(Checkin.checkin_week == str(date.isocalendar()[1]), Checkin.checkin_day == str(2) ).all()
self.wednesday_checkins = Checkin.query.filter(Checkin.checkin_week == str(date.isocalendar()[1]), Checkin.checkin_day == str(3) ).all()
self.thursday_checkins = Checkin.query.filter(Checkin.checkin_week == str(date.isocalendar()[1]), Checkin.checkin_day == str(4) ).all()
self.friday_checkins = Checkin.query.filter(Checkin.checkin_week == str(date.isocalendar()[1]), Checkin.checkin_day == str(5) ).all()
def get_bad_replicas_summary(rse_expression=None, from_date=None, to_date=None, session=None):
"""
List the bad file replicas summary. Method used by the rucio-ui.
:param rse_expression: The RSE expression.
:param from_date: The start date.
:param to_date: The end date.
:param session: The database session in use.
"""
result = []
incidents = {}
rse_clause = []
if rse_expression:
for rse in parse_expression(expression=rse_expression, session=session):
rse_clause.append(models.RSE.rse == rse['rse'])
if session.bind.dialect.name == 'oracle':
to_days = func.trunc(models.BadReplicas.created_at, str('DD'))
elif session.bind.dialect.name == 'mysql':
to_days = func.date(models.BadReplicas.created_at)
elif session.bind.dialect.name == 'postgresql':
to_days = func.date_trunc('day', models.BadReplicas.created_at)
else:
to_days = func.strftime(models.BadReplicas.created_at, '%Y-%m-%d')
query = session.query(func.count(), to_days, models.RSE.rse, models.BadReplicas.state, models.BadReplicas.reason).filter(models.RSE.id == models.BadReplicas.rse_id)
# To be added : HINTS
if rse_clause != []:
query = query.filter(or_(*rse_clause))
if from_date:
query = query.filter(models.BadReplicas.created_at > from_date)
if to_date:
query = query.filter(models.BadReplicas.created_at < to_date)
summary = query.group_by(to_days, models.RSE.rse, models.BadReplicas.reason, models.BadReplicas.state).all()
for row in summary:
if (row[2], row[1], row[4]) not in incidents:
incidents[(row[2], row[1], row[4])] = {}
incidents[(row[2], row[1], row[4])][str(row[3])] = row[0]
for incident in incidents:
res = incidents[incident]
res['rse'] = incident[0]
res['created_at'] = incident[1]
res['reason'] = incident[2]
result.append(res)
return result
def list_bad_replicas_status(state=BadFilesStatus.BAD, rse=None, younger_than=None, older_than=None, limit=None, list_pfns=False, session=None):
"""
List the bad file replicas history states. Method used by the rucio-ui.
:param state: The state of the file (SUSPICIOUS or BAD).
:param rse: The RSE name.
:param younger_than: datetime object to select bad replicas younger than this date.
:param older_than: datetime object to select bad replicas older than this date.
:param limit: The maximum number of replicas returned.
:param session: The database session in use.
"""
result = []
rse_id = None
if rse:
rse_id = get_rse_id(rse, session=session)
query = session.query(models.BadReplicas.scope, models.BadReplicas.name, models.RSE.rse, models.BadReplicas.state, models.BadReplicas.created_at, models.BadReplicas.updated_at)
if state:
query = query.filter(models.BadReplicas.state == state)
if rse_id:
query = query.filter(models.BadReplicas.rse_id == rse_id)
if younger_than:
query = query.filter(models.BadReplicas.created_at >= younger_than)
if older_than:
query = query.filter(models.BadReplicas.created_at <= older_than)
query = query.filter(models.RSE.id == models.BadReplicas.rse_id)
if limit:
query = query.limit(limit)
for badfile in query.yield_per(1000):
if list_pfns:
result.append({'scope': badfile.scope, 'name': badfile.name, 'type': DIDType.FILE})
else:
result.append({'scope': badfile.scope, 'name': badfile.name, 'rse': badfile.rse, 'state': badfile.state, 'created_at': badfile.created_at, 'updated_at': badfile.updated_at})
if list_pfns:
reps = []
for rep in list_replicas(result, schemes=['srm', ], unavailable=False, request_id=None, ignore_availability=True, all_states=True, session=session):
pfn = None
if rse in rep['rses'] and rep['rses'][rse]:
pfn = rep['rses'][rse][0]
if pfn and pfn not in reps:
reps.append(pfn)
else:
reps.extend([item for row in rep['rses'].values() for item in row])
list(set(reps))
result = reps
return result
def get_streaks(s: sqlalchemy.orm.session.Session,
active: Optional[bool]=None,
limit: Optional[int]=None,
max_age: Optional[int]=None,
) \
-> Sequence[Streak]:
"""Get streaks, ordered by length (longest first).
Parameters:
active: only return streaks with this active flag
limit: only return (up to) limit results
max_age: only return streaks with a win less than this many days old
Returns:
List of active streaks.
"""
# The following code is a translation of this basic SQL:
# SELECT streaks.*, count(games.streak_id) as streak_length
# FROM streaks
# JOIN games ON (streaks.id = games.streak_id)
# GROUP BY streaks.id
# HAVING streak_length > 1
# ORDER BY streak_length DESC
streak_length = func.count(Game.streak_id).label('streak_length')
streak_last_activity = func.max(Game.end).label('streak_last_activity')
q = s.query(Streak, streak_length).join(Streak.games)
q = q.group_by(Streak.id)
q = q.having(streak_length > 1)
if max_age is not None:
q = q.having(
streak_last_activity > func.date('now', '-%s day' % max_age))
q = q.order_by(streak_length.desc())
if active is not None:
q = q.filter(Streak.active == (sqlalchemy.true()
if active else sqlalchemy.false()))
if limit is not None:
q = q.limit(limit)
streaks = q.all()
# Since we added a column to the query, the result format is:
# ((Streak, length), (Streak, length), ...)
# It's annoying to deal with a custom format, and recalculating the streak
# length for a few streaks is NBD, so just return a list of Streaks
return [t.Streak for t in streaks]