def first_entry(user_id):
"""Returns date of user's first entry as a datetime object."""
first_entry = db.session.query(Entry.date).filter(Entry.user_id == user_id).\
order_by('date').first()
return first_entry[0]
python类db()的实例源码
def last_entry(user_id):
"""Returns date of user's last entry as a datetime object."""
last_entry = db.session.query(Entry.date).filter(Entry.user_id == user_id).\
order_by(desc('date')).first()
return last_entry[0]
# NOTE: Generalize and rename function.
def calculate_avg_sleep(user_id, start_date, end_date):
"""Calculates user's average hours of sleep per night from start_date
to end_date."""
avg_sleep = db.session.query(func.avg(Entry.minutes_asleep)).filter(Entry.user_id\
== user_id, Entry.date >= start_date, Entry.date <= end_date)
avg_sleep = int(avg_sleep[0][0])/60.0
return avg_sleep
def calculate_avg(user_id, start_date, end_date, column_name):
"""Calculates average of field with column_name from start_date to end_date."""
avg = db.session.query(func.avg(column_name)).filter\
(Entry.user_id == user_id, Entry.date >= start_date, \
Entry.date <= end_date)
avg = avg[0][0]
return avg
def calculate_median_sleep(user_id, start_date, end_date):
"""Calculates user's all-time median hours of sleep per night."""
minutes_asleep_tups = db.session.query(Entry.minutes_asleep).filter\
(Entry.user_id == user_id, Entry.date >= start_date,\
Entry.date <= end_date).order_by('minutes_asleep').all()
minutes_asleep_lst = []
for item in minutes_asleep_tups:
minutes_asleep_lst.append(item[0])
median_sleep = median(minutes_asleep_lst) / 60
return median_sleep
def frequency_insomnia_type(user_id, start_date, end_date, insom_type):
"""Returns count of occurrence of insom_type from start_date to end_date."""
frequency_type = db.session.query(Entry.insom_type, db.func.count\
(Entry.insom_type)).filter(Entry.user_id == user_id,\
Entry.date >= start_date, Entry.date <= end_date, \
Entry.insom_type == insom_type).\
group_by(Entry.insom_type).all()
if len(frequency_type) != 0:
return frequency_type[0][1]
else:
return 0
def dashboard():
"""Display user's dashboard."""
# Retrieve form data.
user_id = session["user_id"]
date = datetime.now()
date = date.replace(hour=0, minute=0, second=0, microsecond=0)
minutes_asleep = int(request.form.get("hours_sleep")) * 60
insomnia = convert_to_boolean(request.form.get("insomnia"))
insom_type = request.form.get("insom_type")
insom_severity = int(request.form.get("insom_severity"))
alcohol = convert_to_boolean(request.form.get("alcohol"))
caffeine = convert_to_boolean(request.form.get("caffeine"))
menstruation = convert_to_boolean(request.form.get("menstruation"))
bedtime = datetime.strptime((request.form.get("bedtime")), '%H:%M')
stress_level = int(request.form.get("stress_level"))
activity_level = int(request.form.get("activity_level"))
# Create new record in db if no existing record with user_id and date;
# otherwise, update current record.
create_or_update_record(user_id, date, minutes_asleep, insomnia, insom_type,
insom_severity, alcohol, caffeine, menstruation,
bedtime, stress_level, activity_level)
# Pass calculated data to template
return render_template("dashboard.html")
##########################################################################
def sign_up_user():
"""Signs up new user"""
email = request.form.get("email")
password = request.form.get("password")
fname = request.form.get("fname")
lname = request.form.get("lname")
user = User.query.filter_by(email=email).first()
# if user already exists, checks password and logs them in if correct. If not, prompts
# for password again
if user:
if user.verify_password(password):
session['user'] = user.user_id
flash("You are now logged in")
return redirect('/')
# return redirect('/users/' + str(user.user_id))
else:
flash("Password incorrect - There is already a user with this email")
return redirect('/')
else:
#instantiates new user and passes user_id to session
user = User(email=email, password=password, fname=fname, lname=lname)
db.session.add(user)
db.session.commit()
session['user'] = user.user_id
flash("Your account has been created")
return redirect('/')
def add_new_group():
"""Allows user to create a new group"""
group_name = request.form.get('group')
user_id = session.get('user')
new_group = Group(group_name=group_name)
db.session.add(new_group)
db.session.commit()
new_user_group = UserGroup(group_id=new_group.group_id, user_id=user_id)
db.session.add(new_user_group)
db.session.commit()
return redirect('/')
def add_new_list():
"""Allows user to add a new list"""
list_name = request.form.get('list')
group_id = request.form.get('group_id')
new_list = List(list_name=list_name, group_id=group_id)
db.session.add(new_list)
db.session.commit()
return redirect('/lists/' + str(new_list.list_id))
def add_restaurant():
"""Allows user to add restaurant to a list"""
item_id = request.form.get('id')
restaurant_name = request.form.get('restaurant_name')
yelp_rating = request.form.get('yelp_rating')
latitude = request.form.get('latitude')
longitude = request.form.get('longitude')
list_id = request.form.get('list_id')
address = request.form.get('address')
categories = request.form.get('categories')
neighborhoods = request.form.get('neighborhoods')
link = request.form.get('url')
# check if restaurant already in db
get_restaurant = Restaurant.query.filter_by(restaurant_name=restaurant_name, latitude=latitude, longitude=longitude).first()
if get_restaurant:
# check if already part of list then add if no
get_restaurant_list = RestaurantList.query.filter_by(restaurant_id=get_restaurant.restaurant_id, list_id=list_id).first()
if get_restaurant_list:
flash("Restaurant is already part of list")
else:
new_restaurant_list = RestaurantList(restaurant_id=get_restaurant.restaurant_id, list_id=list_id)
db.session.add(new_restaurant_list)
db.session.commit()
# if restaurant is not already in db, add it and add to RestaurantList
else:
new_restaurant = Restaurant(restaurant_name=restaurant_name, yelp_rating=yelp_rating, latitude=latitude, longitude=longitude, address=address, categories=categories, neighborhoods=neighborhoods, link=link)
db.session.add(new_restaurant)
db.session.commit()
# need this line because we just added the restaurant to db and need to get the id to add to RestaurantList
restaurant_info = Restaurant.query.filter_by(restaurant_name=restaurant_name, latitude=latitude, longitude=longitude).first()
new_restaurant_list = RestaurantList(restaurant_id=restaurant_info.restaurant_id, list_id=list_id)
db.session.add(new_restaurant_list)
db.session.commit()
return jsonify(status='success', id=item_id, restaurant_name=restaurant_name, yelp_rating=yelp_rating, latitude=latitude, longitude=longitude)
def leave_group():
"""Allows a user to leave a group"""
group_id = request.form.get('group_id')
user_id = session.get('user')
get_user_group = UserGroup.query.filter_by(group_id=group_id, user_id=user_id).first()
db.session.delete(get_user_group)
db.session.commit()
flash("You have left the group")
return jsonify(result="success")
def mark_visited():
"""Marks visited as true for restaurants_lists"""
rest_id = request.form.get('rest_id')
list_id = request.form.get('list_id')
restaurant_list = RestaurantList.query.filter_by(restaurant_id=rest_id, list_id=list_id).first()
restaurant_list.visited = True
db.session.commit()
return jsonify(status="success", id=rest_id)
def profile_edit():
""" Edit profile information """
# Set the value of the user id of the user in the session
id = session.get('id')
# Query the database for the user
user_info = User.query.filter_by(id=id).first()
# Get information from the forms
name = request.form.get("profile-name")
email = request.form.get("profile-email")
password = request.form.get("new-password")
# Replace info in the database with new info
if name:
user_info.name = name
db.session.commit()
if password:
user_info.password = password
db.session.commit()
if email:
user_info.email = email
db.session.commit()
name_info = {
'name': name,
'email': email
}
# Return jsonified budget info to submit-new-account-info.js
return jsonify(name_info)
def remove_expenditure(id):
""" Remove an expenditure from the database """
# This is the expenditure object we are working with
expenditure_at_hand = Expenditure.query.filter_by(id=id).first()
# Deletes the expenditure item from the expenditure table
db.session.delete(expenditure_at_hand)
db.session.commit()
# Return jsonified id to delete-expenditure.js
return jsonify({"expenditure_id": id})
def _store_banner(data):
banner = Banner.query.filter_by(name=data.name).first()
# if banner in db, update record, if not add it
if banner:
banner.name = data.name
banner.image_url1 = data.image1
banner.image_url2 = data.image2
banner.text = data.text
banner.background = data.background
else:
banner = Banner(data)
db.session.add(banner)
db.session.commit()
def get_position_geohash(points):
""" This takes points and with these points find out what geohash each falls
under. Then we could get the crime_index and total_crimes
"""
# takes in a list as a parameter of [(lat, lng) ... (lat, lng)]
coords_data = [] # to store the dictionary generated
# do something like a for loop over here
for point in points:
geohash_sql = "SELECT * " + \
"FROM nyc_crimes_by_geohash " + \
"WHERE geohash=" + \
"ST_GeoHash(st_makepoint(%s, %s), 7);" % \
(point[0], point[1])
# execute the raw sql, and there should only be one result... so get that.
geohash_query = db.engine.execute(geohash_sql).fetchone()
if geohash_query is None:
# if the geohash isn't found, need to do something,
# query PostGIS for the geohash (not in db)
# then assume that there are no crimes in the area
geohash_of_point = "SELECT ST_GeoHash(geometry(Point(%s, %s)), 7);" \
% (point[0], point[1])
geohash_found = db.engine.execute(geohash_of_point).fetchone()
geohash_query = [0, geohash_found[0], 0, 0.0]
geohash_query_data = {
'geohash': geohash_query[1],
'total_crimes': geohash_query[2],
'crime_index': float(geohash_query[3]),
'point': point
}
coords_data.append(geohash_query_data)
# return something like [{dicte}, {dictw}], or {dict}, based on total pts
return coords_data
def process_registration():
"""Processes user registration form"""
# creates a new user instance
new_user = User(username=request.form.get('username'),
first_name=request.form.get('fname'),
last_name=request.form.get('lname'),
password=bcrypt.hashpw(request.form.get('password').encode('utf-8'), bcrypt.gensalt()),
email=request.form.get('email'),
image=request.form.get('image'),
phone=request.form.get('phone'),
confirmed_at=datetime.now())
# adds the new user instance to the database and saves
print new_user
db.session.add(new_user)
db.session.commit()
# logs new user in
session['logged_in'] = new_user.user_id
new_user = new_user.__dict__
if '_sa_instance_state' in new_user:
del new_user['_sa_instance_state']
# passes new user's id to angular
new_user['logged_in'] = session['logged_in']
return jsonify(new_user)
def add_plant_to_user():
"""Add a plant to a User's account."""
user_id = int(request.form.get('userId'))
plant_id = int(request.form.get('plantId'))
new_plantuser = PlantUser(user_id=user_id, plant_id=plant_id)
db.session.add(new_plantuser)
db.session.commit()
return 'ok'
def delete_reminder():
"""Deletes a watering reminder for a particular PlantUser"""
plant_id = int(request.form.getlist('plant_id')[0].encode('utf-8'))
user_id = int(request.form.getlist('user_id')[0].encode('utf-8'))
plant_user = PlantUser.query.filter(PlantUser.user_id == user_id, PlantUser.plant_id == plant_id).first()
plant_user.watering_schedule = ''
db.session.commit()
return 'ok'
# Plant Routes *********************************