def assignResults(user, assign_id, sort_id):
assign = session.query(Assignment).filter(
Assignment.id == assign_id).first()
if user.admin:
if sort_id == 0:
posts = session.query(Post).join(Post.user).filter(
Post.assignment_id == assign_id).order_by(
User.l_name, User.f_name)
elif sort_id == 1:
posts = session.query(Post).filter(
Post.assignment_id == assign_id).order_by(Post.created.desc())
else:
posts = session.query(Post).filter(and_(
Post.assignment_id == assign_id,
Post.user_id == user.id)).order_by(desc(Post.created)).all()
return render_template('assignResults.html',
user=user,
posts=posts,
assign=assign,
sort_id=sort_id)
python类query()的实例源码
def login():
if request.method == 'GET':
user = check_for_user()
if user:
return redirect(url_for('main'))
else:
return render_template('login.html')
else:
username = request.form['username']
password = request.form['password']
user = session.query(User).filter(User.username == username).first()
if user:
hashed_password = hashlib.sha512(password + user.salt).hexdigest()
if user.password == hashed_password:
return setCookie(user)
error = 'Invalid username and/or password'
return render_template('login.html',
username=username,
error=error)
def deleteAssign(user, assign_id):
assign = session.query(Assignment).filter(
Assignment.id == assign_id).first()
if request.method == 'GET':
if assign:
return render_template('deleteAssign.html',
user=user,
assign=assign)
else:
if assign:
tests = session.query(Test).filter(
Test.assignment_id == assign_id).all()
posts = session.query(Post).filter(
Post.assignment_id == assign_id).all()
for t in tests:
session.delete(t)
for p in posts:
session.delete(p)
session.delete(assign)
session.commit()
return redirect(url_for('main'))
def roster(user):
if request.method == 'GET':
users = session.query(User).filter(
User.admin == False).order_by(User.l_name).all() # noqa
admin = session.query(User).filter(
User.admin).order_by(User.l_name).all()
return render_template('roster.html',
user=user,
users=users,
admin=admin)
else:
username = request.form['username']
user = session.query(User).filter(User.username == username).first()
if user:
session.delete(user)
session.commit()
return redirect(url_for('roster'))
def login():
if request.method == 'GET':
return render_template('login.html')
session = Session()
username = request.json.get('username', '')
password = request.json.get('password', '')
registered_user = (session.query(User)
.filter(and_(User.username==username)).first())
if (registered_user is None or not
check_password_hash(registered_user.password, password)):
Session.remove()
return jsonify(success=False)
login_user(registered_user)
flash('Logged in successfully')
Session.remove()
user = current_user.__dict__.copy()
user.pop('_sa_instance_state', None)
return jsonify(success=True, isAdmin=current_user.is_admin(), **user)
def get_shakemaps():
session = Session()
sms = (session.query(ShakeMap)
.order_by(ShakeMap.recieve_timestamp.desc())
.all())
sm_dicts = []
for sm in sms:
sm_dict = sm.__dict__.copy()
sm_dict.pop('_sa_instance_state', None)
sm_dicts += [sm_dict]
sm_json = json.dumps(sm_dicts, cls=AlchemyEncoder)
Session.remove()
return sm_json
def get_shakemap(shakemap_id):
session = Session()
sms = (session.query(ShakeMap)
.filter(ShakeMap.shakemap_id == shakemap_id)
.order_by(ShakeMap.shakemap_version.desc())
.all())
sm_dicts = []
for sm in sms:
sm_dict = sm.__dict__.copy()
sm_dict.pop('_sa_instance_state', None)
sm_dicts += [sm_dict]
sm_json = json.dumps(sm_dicts, cls=AlchemyEncoder)
Session.remove()
return sm_json
def shakemap_overlay(shakemap_id):
session = Session()
shakemap = (session.query(ShakeMap)
.filter(ShakeMap.shakemap_id == shakemap_id)
.order_by(desc(ShakeMap.shakemap_version))
.limit(1)).first()
if shakemap is not None:
img = os.path.join(app.config['EARTHQUAKES'],
shakemap_id,
shakemap_id + '-' + str(shakemap.shakemap_version),
'ii_overlay.png')
else:
img = app.send_static_file('sc_logo.png')
Session.remove()
return send_file(img, mimetype='image/gif')
def get_notification(event_id):
session = Session()
event = session.query(Event).filter(Event.event_id == event_id).first()
dicts = []
if event is not None:
nots = event.notifications
for sm in event.shakemaps:
nots += sm.notifications
for obj in nots:
dict_ = obj.__dict__.copy()
dict_.pop('_sa_instance_state', None)
dict_['group_name'] = obj.group.name
dicts += [dict_]
json_ = json.dumps(dicts, cls=AlchemyEncoder)
Session.remove()
return json_
def notification_html(notification_type, name):
config = json.loads(request.args.get('config', 'null'))
session = Session()
not_builder = NotificationBuilder()
if notification_type == 'new_event':
# get the two most recent events
events = session.query(Event).all()
events = events[-2:]
html = not_builder.build_new_event_html(events=events, name=name, web=True, config=config)
else:
# get the most recent shakemap
sms = session.query(ShakeMap).all()
sm = sms[-1]
html = not_builder.build_insp_html(sm, name=name, web=True, config=config)
Session.remove()
return html
def get_group_info(group_id):
session = Session()
group = (session.query(Group)
.filter(Group.shakecast_id == group_id)
.first())
if group is not None:
group_specs = {'inspection': group.get_alert_levels(),
'new_event': group.get_min_mag(),
'heartbeat': group.has_spec('heartbeat'),
'scenario': group.get_scenario_alert_levels(),
'facilities': get_facility_info(group_name=group.name),
'users': group.users,
'template': group.template}
specs_json = json.dumps(group_specs, cls=AlchemyEncoder)
Session.remove()
return specs_json
def modify_user_info(args):
email = args.get("email")
user = db.session.query(User).filter_by(email=email).first()
float_items = ["height", "weight", "bust", "Waist", "BMI"]
int_items = ["vip", "step_number"]
if user and ("email" in session) and session["email"] == user.email:
for key in args:
value = args[key]
if value and key != "portrait":
if key in float_items:
value = float(value)
elif key in int_items:
value = int(value)
user.__setattr__(key, value)
db.session.commit()
return {"result": "success", "message": "modify user info success"}
else:
return {"result": "error", "message": "please login"}
def index():
# Grab the choice variable passed from POST
# for x in request.args['choice']:
# print('in args: {}'.format(x))
# hook up database functions
Session = sessionmaker()
# locate the db location; currently hardcoded to development database
engine = create_engine('sqlite:///' + os.path.join(basedir, 'data-dev.sqlite'))
Session.configure(bind=engine)
session = Session()
post_count = session.query(func.count(Post.id)).scalar() # count number of unique posts in the table
pic1 = Post.query.get(randint(1, post_count)) # fails if there is 1 or less entries in the database
pic2 = None
while pic2 == pic1 or pic2 is None: # Don't pick the same file
pic2 = Post.query.get(randint(1, post_count))
pic1_filename = url_for('static', filename=pic1.filename)
pic2_filename = url_for('static', filename=pic2.filename)
return render_template('index.html', pic1=pic1, pic2=pic2, pic1_filename=pic1_filename, pic2_filename=pic2_filename)
def check_for_user():
cookie_value = request.cookies.get('user_id')
if cookie_value:
params = cookie_value.split('|')
if hashlib.sha512(params[0] + hash_salt).hexdigest() == params[1]:
user = session.query(User).filter(
User.username == params[0]).first()
if user:
return user
def main():
user = check_for_user()
if user:
print(user.f_name)
assign = session.query(Assignment).order_by(desc(Assignment.created)).all()
return render_template('main.html',
user=user,
assign=assign)
def assignResultsReview(user, post_id):
post = session.query(Post).filter(post_id == Post.id).first()
assign = session.query(Assignment).filter(
Assignment.id == post.assignment_id).first()
if post.user_id != user.id and not user.admin:
return abort(403)
if request.method == 'GET':
return render_template('assignReviewResults.html',
assign=assign,
post=post,
user=user)
else:
session.delete(post)
session.commit()
return redirect(url_for('assignView', assign_id=assign.id))
def postFeedback(user, post_id):
post = session.query(Post).filter(Post.id == post_id).first()
# form_id = 'feedback_box_%s' % post_id
notes = request.form['data']
print('notes', notes)
if post:
post.notes = notes
session.commit()
return notes
else:
print('no post')
def allResults(user):
assign = session.query(Assignment).order_by(desc(Assignment.created)).all()
return render_template('allResults.html',
user=user,
assign=assign)
def editAssign(user, assign_id):
assign = session.query(Assignment).filter(
Assignment.id == assign_id).first()
params = {}
if request.method == 'GET':
params['title'] = assign.name
params['desc'] = assign.desc
return render_template('admin.html',
user=user,
params=params)
else:
title = request.form['title']
descrip = request.form['desc']
assign_type = request.form['assign_type']
include_tf = request.form.get('include_testfiles')
if title and descrip:
assign.name = title
assign.desc = descrip
assign.int_type = assign_type
if include_tf:
assign.include_tf = True
else:
assign.include_tf = False
session.commit()
return redirect(url_for('assignView', assign_id=assign_id))
else:
params['title'] = title
params['desc'] = descrip
params['error'] = 'Please fill in both fields before continuing.'
return render_template('admin.html',
user=user,
params=params)
def testView(user, test_id):
test = session.query(Test).filter(Test.id == test_id).first()
if request.method == 'GET':
assign = session.query(Assignment).filter(
Assignment.id == test.assignment_id).first()
return render_template('testView.html',
test=test,
user=user,
assign=assign)
def resetPassword(user):
if request.method == 'GET':
return render_template('resetPass.html',
user=user)
else:
username = request.form['username']
password = request.form['password']
if not username or not password:
status_message = 'Both fields are required.'
return render_template('resetPass.html',
status_message=status_message,
user=user)
user = session.query(User).filter(User.username == username).first()
if not user:
status_message = 'User could not be found. ' \
'Please verify their username and try again.'
return render_template('resetPass.html',
status_message=status_message,
user=user)
salt = make_salt()
user.salt = salt
user.password = hashlib.sha512(password + salt).hexdigest()
session.commit()
status_message = 'Users password has been changed.'
return render_template('resetPass.html',
status_message=status_message,
user=user)
def deleteUser(user, user_id):
user = session.query(User).filter(User.id == user_id).first()
posts = session.query(Post).filter(Post.user_id == user_id).all()
if posts:
for p in posts:
session.delete(p)
session.commit()
if user:
session.delete(user)
session.commit()
return redirect(url_for('roster'))
def all(user):
users = session.query(User).all()
assign = session.query(Assignment).all()
posts = session.query(Post).all()
tests = session.query(Test).all()
return render_template('all.html',
users=users,
posts=posts,
assign=assign,
tests=tests,
user=user)
def load_user(user_id):
session = Session()
user = session.query(User).filter(User.shakecast_id==int(user_id)).first()
# Expunge here might help with Windows threading
#session.expunge(user)
Session.remove()
return user
def get_shaking_events(facility_id):
session = Session()
fac = session.query(Facility).filter(Facility.shakecast_id == facility_id).first()
eqs = [fs.shakemap.event for fs in fac.shaking_history if fs.shakemap is not None]
eq_dicts = []
for eq in eqs:
if eq is not None:
eq_dict = eq.__dict__.copy()
eq_dict['shakemaps'] = len(eq.shakemaps)
eq_dict.pop('_sa_instance_state', None)
eq_dicts += [eq_dict]
Session.remove()
return jsonify(success=True, data=eq_dicts)
def get_shaking_data(facility_id, eq_id):
session = Session()
shaking = (session.query(FacilityShaking)
.filter(FacilityShaking
.shakemap
.has(ShakeMap.shakemap_id == eq_id))
.first())
shaking_dict = None
if shaking:
shaking_dict = shaking.__dict__.copy()
shaking_dict.pop('_sa_instance_state', None)
Session.remove()
return jsonify(success=True, data=shaking_dict)
def get_affected_facilities(shakemap_id):
session = Session()
sms = (session.query(ShakeMap)
.filter(ShakeMap.shakemap_id == shakemap_id)
.order_by(ShakeMap.shakemap_version.desc())
.all())
fac_dicts = []
alert = {'gray': 0,
'green': 0,
'yellow': 0,
'orange': 0,
'red': 0}
if sms:
sm = sms[0]
fac_shaking = sm.facility_shaking
fac_dicts = [0] * len(sm.facility_shaking)
i = 0
for s in fac_shaking:
fac_dict = s.facility.__dict__.copy()
s_dict = s.__dict__.copy()
fac_dict.pop('_sa_instance_state', None)
s_dict.pop('_sa_instance_state', None)
fac_dict['shaking'] = s_dict
fac_dicts[i] = fac_dict
i += 1
# record number of facs at each alert level
alert[fac_dict['shaking']['alert_level']] += 1
shaking_data = {'alert': alert, 'facilities': fac_dicts, 'types': {}}
shaking_json = json.dumps(shaking_data, cls=AlchemyEncoder)
Session.remove()
return shaking_json
def event_image(event_id):
session = Session()
event = (session.query(Event)
.filter(Event.event_id == event_id)
.limit(1)).first()
if event is not None:
img = os.path.join(app.config['EARTHQUAKES'],
event_id,
'image.png')
else:
img = app.send_static_file('sc_logo.png')
Session.remove()
return send_file(img, mimetype='image/gif')
def get_user_groups(user_id):
session = Session()
user = session.query(User).filter(User.shakecast_id == user_id).first()
groups = []
if user is not None and user.groups:
for group in user.groups:
group_dict = group.__dict__.copy()
group_dict.pop('_sa_instance_state', None)
groups += [group_dict]
groups_json = json.dumps(groups, cls=AlchemyEncoder)
Session.remove()
return groups_json
def get_project(ws_id, pj_id: int, session=db_session()) -> Project:
"""
Returns a project and raises 404, when project not found.
:param ws_id: Workspace id
:param pj_id: Project id
:param session: db session
:return: Project model
"""
project = session.query(Project).join(Workspace) \
.filter(Workspace.id == ws_id) \
.filter(Project.id == pj_id).first()
if not project:
raise NotFound("Could not find project with id {}".format(pj_id))
return project