def calculate_avg_sleep_over_time(user_id, start_date, end_date):
"""Calculates the average of each time interval from start_date to end_date.
Returns a list of averages as floats and a list of dates as strings."""
averages = []
start_dates = []
while start_date <= (end_date - timedelta(days=7)):
interval_end_date = start_date + timedelta(days=7)
avg = calculate_avg_sleep(user_id, start_date, interval_end_date)
averages.append(avg)
start_date = start_date + timedelta(days=7)
date = "%s/%s" % (start_date.month, start_date.day)
start_dates.append(date)
return averages, start_dates
# NOTE: Collapse into calculate_median during Phase 3 refactor.
python类avg()的实例源码
def get_bikes_for_weekday(cls, dbsession, weekday, station_id):
"""returns a list of bikes for a provided weekday and station.
averaged per hour so 24 results."""
station = [("Time", "Available Bikes", "Available Stands")]
station_data = dbsession.query(func.hour(cls.last_update),
func.avg(cls.available_bikes),
func.avg(cls.available_bike_stands)) \
.filter(cls.station_id == station_id,
func.weekday(cls.last_update) == weekday) \
.group_by(func.hour(cls.last_update)) \
.all()
# this section parses the query return into a readable list.
# from docs:extend() appends the contents of seq to list.
if station_data:
station.extend([(a, float(b), float(c)) for a, b, c in station_data])
else:
station.extend([(0,0,0)])
return station
def get_bikes_for_wetday(cls, dbsession, wetdate, station_id):
"""very similar to get_bikes_for_weekday but not the same: date specified is wetdate not weekday.
returns a list of bikes for a provided datetime object (wetdate) and station."""
# averaged per hour so 24 results.
station = [("Time", "Available Bikes", "Available Stands")]
station_data = dbsession.query(
func.hour(cls.last_update),
func.avg(cls.available_bikes),
func.avg(cls.available_bike_stands))\
.filter(cls.station_id == station_id,
func.date(cls.last_update) == wetdate.date())\
.group_by(func.hour(cls.last_update)).all()
# this section parses the query return into a readable list.
# from docs:extend() appends the contents of seq to list.
if station_data:
station.extend([(a, float(b), float(c)) for a, b, c in station_data])
else:
station.extend([(0,0,0)])
return station
def get_bikes_for_week(cls, dbsession, station_id):
"""as method name describes.
similar to methods above but averaged over week."""
station = [("Day", "Available Bikes")]
station_data = dbsession.query(func.weekday(cls.last_update),
func.avg(cls.available_bikes)) \
.filter(cls.station_id == station_id) \
.group_by(func.weekday(cls.last_update)) \
.all()
# this section parses the query return into a readable list.
# from docs:extend() appends the contents of seq to list.
if station_data:
station.extend([(days[a], float(b)) for a, b in station_data])
else:
station.extend([(0,0)])
return station
def recalibrate_hero_values(session, league_id):
heroes = session.query(Hero).filter(Hero.league == league_id)
average_points = float(session.query(func.avg(Hero.points)).filter(Hero.league == league_id).scalar())
for hero in heroes:
new_calibration = calibrate_value(average_points, hero.points)
print "new calbration: %s, from %s" % (new_calibration, hero.value)
hero.value = round(combine_calibrations(hero.value, new_calibration), 1)
def top_rated_packages(cls, limit=10):
# NB Not using sqlalchemy as sqla 0.4 doesn't work using both group_by
# and apply_avg
package = table('package')
rating = table('rating')
sql = select([package.c.id, func.avg(rating.c.rating), func.count(rating.c.rating)], from_obj=[package.join(rating)]).\
where(and_(package.c.private==False, package.c.state=='active')). \
group_by(package.c.id).\
order_by(func.avg(rating.c.rating).desc(), func.count(rating.c.rating).desc()).\
limit(limit)
res_ids = model.Session.execute(sql).fetchall()
res_pkgs = [(model.Session.query(model.Package).get(unicode(pkg_id)), avg, num) for pkg_id, avg, num in res_ids]
return res_pkgs
def aggregate(cls, value):
return {
'average_age': func.avg(cls.age),
'average_wage': func.avg(cls.wage),
'wage': func.sum(cls.wage),
'jobs': func.count(cls.employee),
'average_establishment_size': func.count(cls.employee) / func.count(distinct(cls.establishment))
}[value]
def aggregate(cls, value):
return {
'enrolleds': func.count(),
'entrants': func.sum(cls.entrant),
'graduates': func.sum(cls.graduate),
'average_age': func.avg(cls.age)
}[value]
def aggregate(cls, value):
return {
'average_age': func.avg(cls.age),
'students': func.count(),
'classes': func.count(distinct(cls.sc_class)),
'average_class_size': func.count() / func.count(distinct(cls.sc_class)),
'schools': func.count(distinct(cls.sc_school)),
}[value]
def test_session_query(session, table):
col_concat = func.concat(table.c.string).label('concat')
result = (
session
.query(
table.c.string,
col_concat,
func.avg(table.c.integer),
func.sum(case([(table.c.boolean == True, 1)], else_=0))
)
.group_by(table.c.string, col_concat)
.having(func.avg(table.c.integer) > 10)
).all()
assert len(result) > 0
def calculate_avg_sleep(user_id, start_date, end_date):
"""Calculates user's average hours of sleep per night from start_date
to end_date."""
avg_sleep = db.session.query(func.avg(Entry.minutes_asleep)).filter(Entry.user_id\
== user_id, Entry.date >= start_date, Entry.date <= end_date)
avg_sleep = int(avg_sleep[0][0])/60.0
return avg_sleep
def calculate_avg(user_id, start_date, end_date, column_name):
"""Calculates average of field with column_name from start_date to end_date."""
avg = db.session.query(func.avg(column_name)).filter\
(Entry.user_id == user_id, Entry.date >= start_date, \
Entry.date <= end_date)
avg = avg[0][0]
return avg