def user_profile(user_id):
"""Show user profile"""
user_info = User.query.filter_by(user_id=user_id).first()
account_created = user_info.account_created
account_created = str(account_created)[:11]
system_info = UserSystem.query.filter_by(user_id=user_id).all()
rating_info = (db.session.query(Game.name, Rating.score, Game.game_id)
.join(Rating).filter(Rating.user_id==user_id)
.all())
num_games = (db.session.query(func.count(Rating.user_id))
.filter(Rating.user_id == user_id)
.first())
num_games = int(num_games[0])
return render_template("user_profile.html", user_info=user_info,
system_info=system_info,
rating_info=rating_info,
account_created=account_created,
user_id=user_id,
num_games=num_games)
python类db()的实例源码
def game_rating():
"""Update or add game rating to database"""
rating = request.form.get("rating")
game_id = request.form.get("game_id")
current_user = session["user_id"]
existing_rating = Rating.query.filter_by(user_id=current_user, game_id=game_id).first()
if existing_rating is None:
new_rating = Rating(user_id=current_user, game_id=game_id,score=rating)
db.session.add(new_rating)
db.session.commit()
else:
existing_rating.score = rating
db.session.commit()
return jsonify({"rating": rating})
def load_apps():
"""load apps into myapps global dictionary"""
global myapps, default_app
# Connect to DB
result = db().select(apps.ALL)
myapps = {}
for row in result:
name = row['name']
appid = row['id']
preprocess = row['preprocess']
postprocess = row['postprocess']
input_format = row['input_format']
try:
print 'loading: %s (id: %s)' % (name, appid)
myapps[name] = app_instance(input_format, name, preprocess, postprocess)
myapps[name].appid = appid
myapps[name].input_format = input_format
except:
exc_type, exc_value, exc_traceback = sys.exc_info()
print traceback.print_exception(exc_type, exc_value, exc_traceback)
print 'ERROR: LOADING: %s (ID: %s) FAILED TO LOAD' % (name, appid)
default_app = name # simple soln - use last app read from DB
return True
def get_stats():
root.authorized()
params = {}
# number of jobs in queued, running, and completed states
params['nq'] = db(jobs.state=='Q').count()
params['nr'] = db(jobs.state=='R').count()
params['nc'] = db(jobs.state=='C').count()
params['cpu'] = psutil.cpu_percent()
params['vm'] = psutil.virtual_memory().percent
params['disk'] = psutil.disk_usage('/').percent
params['cid'] = request.query.cid
params['app'] = request.query.app
return template("stats", params)
def process_registration():
"""Add new user to database and log them in."""
email = request.form.get('email')
password = request.form.get('password')
# instantiate a user object with the information provided
new_user = User(email=email,
password=password)
# add user to db session and commit to db
db.session.add(new_user)
db.session.commit()
# add user to the session; redirect to homepage
session['user'] = new_user.user_id
session['waypoints'] = []
flash("You're logged in.")
return redirect('/')
def query_recurring_meetings():
"""Helper function used by query database for recurring meetings"""
dicto = {}
# labels = []
# data = []
meetings = Meeting.query.filter_by(recurring_id=1).all()
# meetings = db.session.query(
# Meeting.meeting_title,
# Meeting.meeting_time,
# # Rating.score).all()
# Rating.score).filter_by(Meeting.recurring_id=1)
# for mtg in meetings:
# print mtg.meeting_title
# for rating in mtg.rating:
# print rating.score
for mtg in meetings:
title = meeting_title
dicto[mtg.meeting_time] = dicto.get(mtg.rating.score)
return
def integer_type_data(user_id, start_date, end_date, column_name):
"""Returns a tuple of lists, one list of dates as strings, and one list of
data points as integers. column_name should be an object with attribute
column_name, e.g. Entry.stress_level."""
data_points = sorted(db.session.query(Entry.date, column_name).filter\
(Entry.user_id == user_id, Entry.date >= start_date,
Entry.date <= end_date).all())
dates = []
scores = []
for item in data_points:
date = "%s/%s" % (item[0].month, item[0].day)
dates.append(date)
score = item[1]
scores.append(score)
return dates, scores
def hours_sleep_data(user_id, start_date, end_date):
"""Returns a tuple of lists, one list of dates as strings,
and one list of hours_sleep data points as integers."""
data_points = sorted(db.session.query(Entry.date, Entry.minutes_asleep).filter
(Entry.user_id == user_id, Entry.date>= start_date, Entry.date <=
end_date).all())
dates = []
hours_sleep_list = []
for item in data_points:
date = "%s/%s" % (item[0].month, item[0].day)
dates.append(date)
hours_sleep = item[1] / 60
hours_sleep_list.append(hours_sleep)
return dates, hours_sleep_list
def insom_factors(user_id, column_name):
"""Returns the percentage co-occurrence between insomnia(T/F)
and another column with T/F values. column_name should be a table object
with attribute column_name."""
query_list = db.session.query(Entry.insomnia, column_name).filter\
(Entry.user_id == user_id).order_by('date').all()
insom_list = []
mens_list = []
for item in query_list:
insom_list.append(item[0])
mens_list.append(item[1])
co_occurrence = calculate_similarity(insom_list, mens_list)
return co_occurrence
def invite_user():
"""Allows user to invite other member to group by email"""
email = request.form.get('invite')
user = User.query.filter_by(email=email).first()
group_id = request.form.get('group_id')
group = Group.query.filter_by(group_id=group_id).first()
if user:
user_search = UserGroup.query.filter_by(user_id=user.user_id, group_id=group_id).first()
if user_search:
flash("User is already in this group")
else:
new_user = UserGroup(user_id=user.user_id, group_id=group_id)
db.session.add(new_user)
db.session.commit()
flash("Added new user " + email + " to " + group.group_name)
else:
flash("No such user")
return redirect('/')
def delete_list():
"""Allows user to remove a list from a group"""
list_id = request.form.get('list_id')
list_item = List.query.get(list_id)
restaurants_lists = RestaurantList.query.filter_by(list_id=list_id).all()
db.session.delete(list_item)
for rl in restaurants_lists:
db.session.delete(rl)
db.session.commit()
flash("Removed " + list_item.list_name)
return jsonify(result='success')
def add_restaurant_to_faves():
"""Adds a restaurant to a user's favorites list"""
restaurant_id = request.form.get('rest_id')
user_id = session.get('user')
get_fave_restaurant = Fave.query.filter_by(restaurant_id=restaurant_id, user_id=user_id).first()
if get_fave_restaurant:
Fave.query.filter_by(fave_id=get_fave_restaurant.fave_id).delete()
db.session.commit()
else:
fave_restaurant = Fave(restaurant_id=restaurant_id, user_id=user_id)
db.session.add(fave_restaurant)
db.session.commit()
return jsonify(result="success", id=restaurant_id)
def return_restaurants():
"""Gives response to browser of restaurants in db for given list"""
list_id = request.form.get('list_id')
item_id = request.form.get('id')
restaurants_lists = RestaurantList.query.filter_by(list_id=list_id, visited=False).all()
restaurants = []
for item in restaurants_lists:
restaurant = Restaurant.query.filter_by(restaurant_id=item.restaurant_id).first()
restaurants.append({'restaurant_name': restaurant.restaurant_name,
'yelp_rating': restaurant.yelp_rating,
'latitude': restaurant.latitude,
'longitude': restaurant.longitude,
'restaurant_id': restaurant.restaurant_id})
print restaurants
return jsonify(status="success", results=restaurants, id=item_id)
def remove_budget(id):
""" Remove a budget from the database """
# This is the budget object we are working with
budget_at_hand = Budget.query.filter_by(id=id).first()
# This is the user id of the user in the session
user_id = session.get('id')
# Check to make sure the budget is associated with the logged in user
if user_id == budget_at_hand.budget_userid:
# Deletes the budget item from the budget table
db.session.delete(budget_at_hand)
db.session.commit()
# Redirect the user to their dashboard
return redirect(url_for('dashboard', id=user_id))
def process_login():
"""Processes user input and either logs user in if input is in database"""
# gets the user input from the username field and looks it up in the database
username = request.form.get('username')
user = User.query.filter_by(username=username).first()
# if username entered exists in db, gets the password entered and compares
# it to the one in the database
if user:
# if password is correct, adds user to the current session and redirects to home page
if bcrypt.hashpw(request.form.get('password').encode('utf-8'), user.password.encode('utf-8')).decode() == user.password:
session['logged_in'] = user.user_id
print 'logged in'
return jsonify(session)
# if password is incorrect, redirects to login page
else:
return 'error'
# if username is not in the database, redirects to the registration form
else:
return 'error'
def update_user():
"""Saves updated user info."""
user_id = request.form.get('id')
user_to_update = User.query.get(int(user_id))
if bcrypt.hashpw(request.form.get('password').encode('utf-8'), user_to_update.password.encode('utf-8')).decode() == user_to_update.password:
if request.form.get('email'):
user_to_update.email = request.form.get('email')
if request.form.get('phone'):
user_to_update.phone = request.form.get('phone')
else:
return "bad password"
db.session.commit()
return "ok"
# PlantUser Routes *********************************
def add_reminder():
"""Adds a watering reminder for a particular PlantUser"""
user_id = int(request.form.getlist('user_id')[0].encode('utf-8'))
if User.query.get(user_id).phone:
plant_id = int(request.form.getlist('plant_id')[0].encode('utf-8'))
days = request.form.getlist('days')[0].encode('utf-8')
plant_user = PlantUser.query.filter(PlantUser.user_id == user_id, PlantUser.plant_id == plant_id).first()
plant_user.watering_schedule = days
db.session.commit()
return 'ok'
else:
return 'phone number missing'
def show_plant_details(plant_id):
"""Show individual plant's page"""
found_plant = Plant.query.get(plant_id)
if not found_plant.image:
found_plant.image = "/static/img/placeholder-image.png"
db.session.commit()
found_plant = found_plant.__dict__
if '_sa_instance_state' in found_plant:
del found_plant['_sa_instance_state']
return jsonify(found_plant)
def process_new_plant():
"""Gets the user input from new plant form and adds to the database"""
# if user did not add image url, get one from flickr
name = request.form.get('name')
image = request.form.get('image')
if not image:
image = get_flickr_image(name)
# gets plant info from angular's data passed in and creates new Plant instance
new_plant = Plant(name=name,
species=request.form.get('species'),
image=image,
water=request.form.get('water'),
sun=request.form.get('sun'),
humidity=request.form.get('humidity'),
temperature=request.form.get('temp'))
# adds plant to the database and saves
db.session.add(new_plant)
db.session.commit()
# returns plant ID to angular's callback
return str(new_plant.plant_id)
def update_plant():
"""Updates plant"""
plant = Plant.query.get(request.form.get('plant_id'))
plant.name = request.form.get('name')
plant.species = request.form.get('species')
plant.image = request.form.get('image')
plant.water = request.form.get('water')
plant.sun = request.form.get('sun')
plant.humidity = request.form.get('humidity')
plant.temperature = request.form.get('temperature')
db.session.commit()
return 'plant updated'
def get_team_events(selected_calendars, startdate, enddate):
"""Queries db for selected calendars and date range,
returns list of event objects."""
startdate = to_datetime(startdate)
enddate = to_datetime(enddate)
events = set()
evts = db.session.query(CalEvent, Event).join(Event).all()
for cal in selected_calendars:
for calevent, event in evts:
if cal.lower() in calevent.calendar_id and event.start > startdate and event.end < enddate:
events.add(event)
events = list(events)
return [event.serialize() for event in events]
def check_for_new_games():
"""Make an API call and compare new games with existing db"""
platforms = [130, 48, 49, 37, 46, 41, 5, 47, 56, 4, 21, 19, 18, 58, 20, 22, 33, 24,
87, 7, 8, 9, 38, 45, 11, 12, 36, 92, 14, 6, 13, 64, 29, 35, 32, 23, 34,
39, 86, 51, 15, 13 ,79, 80, 119, 120, 135, 136]
to_add = {}
# get the new list of games per platform
systems_json = igdb.platforms({
'ids':platforms,
'fields' : 'games'
})
print type(systems_json[0])
# get the existing list of games per platform
existing_file = open('seed_data/systemsfile2.json')
existing = existing_file.read()
existing = json.loads(existing)
# compare this list to the existing list
for system in systems_json:
for existing_systems in existing:
if system['id'] == existing_systems['id']:
for game in system['games']:
if game not in existing_systems['games']:
if system['id'] in to_add:
to_add[system['id']].append(game)
else:
to_add[system['id']] = [game]
else:
print "No differences"
get_new_games(to_add)
existing_file.close()
# overwrite the old file with the new information
with open('seed_data/systemsfile2.json', 'w') as fp:
json.dump(systems_json, fp)
def register_process():
"""Get information from registration form."""
username = request.form.get("username")
email = request.form.get("email")
password = request.form.get("password")
systems = request.form.getlist("systems")
account_created = datetime.now()
existing_username = User.query.filter_by(username=username).first()
existing_email = User.query.filter_by(email=email).first()
# check if the username is in use
if existing_username is None and existing_email is None:
#check if the email is in use
new_user = User(username=username, email=email, password=password,
account_created=account_created)
db.session.add(new_user)
db.session.commit()
get_user_rating(games)
for system in systems:
# add each system to the database for the specific user
system_id = db.session.query(System.system_id).filter(System.name==system).first()
new_user_system = UserSystem(user_id=new_user.user_id, system_id=system_id)
db.session.add(new_user_system)
db.session.commit()
flash("Successfully registered " + username + "!")
return redirect("/")
else:
flash("Username or email already in use")
# TODO probably handle this in AJAX on the form and be more specific
# as to whether it was the username or email that failed
return redirect("/")
def get_status(jid):
# following headers are needed because of CORS
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'
resp = db(jobs.id==jid).select(jobs.state).first()
if resp is None: return 'X'
else: return resp.state
def execute():
app = request.forms['app']
user = request.forms['user']
cid = request.forms['cid']
desc = request.forms['desc']
np = request.forms['np']
appmod = pickle.loads(request.forms['appmod'])
# remove the appmod key
del request.forms['appmod']
appmod.write_params(request.forms, user)
# if preprocess is set run the preprocessor
try:
if appmod.preprocess:
run_params, _, _ = appmod.read_params(user, cid)
base_dir = os.path.join(user_dir, user, app)
process.preprocess(run_params, appmod.preprocess, base_dir)
if appmod.preprocess == "terra.in":
appmod.outfn = "out"+run_params['casenum']+".00"
except:
return template('error', err="There was an error with the preprocessor")
# submit job to queue
try:
priority = db(users.user==user).select(users.priority).first().priority
uid = users(user=user).id
jid = sched.qsub(app, cid, uid, np, priority, desc)
return str(jid)
#redirect("http://localhost:"+str(config.port)+"/case?app="+str(app)+"&cid="+str(cid)+"&jid="+str(jid))
except OSError:
return "ERROR: a problem occurred"
def get_status(jid):
resp = db(jobs.id==jid).select(jobs.state).first()
if resp is None: return 'X'
else: return resp.state
def execute():
app = request.forms['app']
user = request.forms['user']
cid = request.forms['cid']
desc = request.forms['desc']
np = request.forms['np']
appmod = pickle.loads(request.forms['appmod'])
# remove the appmod key
del request.forms['appmod']
appmod.write_params(request.forms, user)
# if preprocess is set run the preprocessor
try:
if appmod.preprocess:
run_params, _, _ = appmod.read_params(user, cid)
base_dir = os.path.join(user_dir, user, app)
process.preprocess(run_params, appmod.preprocess, base_dir)
if appmod.preprocess == "terra.in":
appmod.outfn = "out"+run_params['casenum']+".00"
except:
return template('error', err="There was an error with the preprocessor")
# submit job to queue
try:
priority = db(users.user==user).select(users.priority).first().priority
uid = users(user=user).id
jid = sched.qsub(app, cid, uid, np, priority, desc)
return str(jid)
#redirect("http://localhost:"+str(config.port)+"/case?app="+str(app)+"&cid="+str(cid)+"&jid="+str(jid))
except OSError:
return "ERROR: a problem occurred"
def query_meetings():
"""Helper function used by query database for all meetings"""
sql_query = """SELECT * FROM meetings"""
# perform db query and get all results
cursor = db.session.execute(sql_query)
results = cursor.fetchall()
return results
def clear():
"""Clears flask session and SQL tables once user is done ."""
session.clear()
db.session.query(Track).delete()
db.session.query(Playlist).delete()
db.session.query(UserAlbum).delete()
db.session.query(Album).delete()
db.session.query(Artist).delete()
db.session.query(User).delete()
db.session.commit()
print "------User's session & data cleared from DB------"
return redirect('/')
def add_to_playlist():
"""Add album to user's Spotify playlist from dropdown menu."""
token = session['token']
user_id = session['user_id']
playlist_id = request.form.get('playlist_id')
album_id = request.form.get('album_id')
# Query SQL db for track URIs, which are needed to add entire album to playlist
tracks = db.session.query(Track).join(Track.albums).filter_by(album_id=album_id).all()
list_of_track_uris = []
for track in tracks:
list_of_track_uris.append(track.album_track_uri)
# uses the Spotipy method to add album to Spotify user's playlist
if token:
sp = spotipy.Spotify(auth=token)
sp.user_playlist_add_tracks(user_id, playlist_id, list_of_track_uris)
#The user selects a playlist to add an album to. This queries the name of the
# playlist to include in the flash-like message.
playlist = Playlist.query.filter_by(playlist_id=playlist_id).one()
playlist_name = str(playlist.playlist_name)
#The user selects a playlist to add an album to. This queries the name of the
# album to include in the flash-like message.
album = Album.query.filter_by(album_id=album_id).one()
album_name = album.album_name
# return playlist_name and album_name for flash-like message
return jsonify({'playlist_name': playlist_name, 'album_name': album_name})