python类query()的实例源码

__init__.py 文件源码 项目:Java-Grader 作者: acronymcreations 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def assignResults(user, assign_id, sort_id):
    assign = session.query(Assignment).filter(
        Assignment.id == assign_id).first()
    if user.admin:
        if sort_id == 0:
            posts = session.query(Post).join(Post.user).filter(
                Post.assignment_id == assign_id).order_by(
                    User.l_name, User.f_name)
        elif sort_id == 1:
            posts = session.query(Post).filter(
                Post.assignment_id == assign_id).order_by(Post.created.desc())
    else:
        posts = session.query(Post).filter(and_(
            Post.assignment_id == assign_id,
            Post.user_id == user.id)).order_by(desc(Post.created)).all()

    return render_template('assignResults.html',
                           user=user,
                           posts=posts,
                           assign=assign,
                           sort_id=sort_id)
__init__.py 文件源码 项目:Java-Grader 作者: acronymcreations 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def login():
    if request.method == 'GET':
        user = check_for_user()
        if user:
            return redirect(url_for('main'))
        else:
            return render_template('login.html')
    else:
        username = request.form['username']
        password = request.form['password']

        user = session.query(User).filter(User.username == username).first()
        if user:
            hashed_password = hashlib.sha512(password + user.salt).hexdigest()
            if user.password == hashed_password:
                return setCookie(user)

        error = 'Invalid username and/or password'
        return render_template('login.html',
                               username=username,
                               error=error)
__init__.py 文件源码 项目:Java-Grader 作者: acronymcreations 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def deleteAssign(user, assign_id):
    assign = session.query(Assignment).filter(
        Assignment.id == assign_id).first()
    if request.method == 'GET':
        if assign:
            return render_template('deleteAssign.html',
                                   user=user,
                                   assign=assign)
    else:
        if assign:
            tests = session.query(Test).filter(
                Test.assignment_id == assign_id).all()
            posts = session.query(Post).filter(
                Post.assignment_id == assign_id).all()
            for t in tests:
                session.delete(t)
            for p in posts:
                session.delete(p)
            session.delete(assign)
            session.commit()
        return redirect(url_for('main'))
__init__.py 文件源码 项目:Java-Grader 作者: acronymcreations 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def roster(user):
    if request.method == 'GET':
        users = session.query(User).filter(
            User.admin == False).order_by(User.l_name).all() # noqa
        admin = session.query(User).filter(
            User.admin).order_by(User.l_name).all()
        return render_template('roster.html',
                               user=user,
                               users=users,
                               admin=admin)
    else:
        username = request.form['username']
        user = session.query(User).filter(User.username == username).first()
        if user:
            session.delete(user)
            session.commit()
        return redirect(url_for('roster'))
web_server.py 文件源码 项目:shakecast 作者: usgs 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def login():
    if request.method == 'GET':
        return render_template('login.html')

    session = Session()
    username = request.json.get('username', '')
    password = request.json.get('password', '')

    registered_user = (session.query(User)
                            .filter(and_(User.username==username)).first())

    if (registered_user is None or not
            check_password_hash(registered_user.password, password)):
        Session.remove()
        return jsonify(success=False)

    login_user(registered_user)
    flash('Logged in successfully')
    Session.remove()

    user = current_user.__dict__.copy()
    user.pop('_sa_instance_state', None)
    return jsonify(success=True, isAdmin=current_user.is_admin(), **user)
web_server.py 文件源码 项目:shakecast 作者: usgs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_shakemaps():
    session = Session()
    sms = (session.query(ShakeMap)
                .order_by(ShakeMap.recieve_timestamp.desc())
                .all())

    sm_dicts = []
    for sm in sms:
        sm_dict = sm.__dict__.copy()
        sm_dict.pop('_sa_instance_state', None)
        sm_dicts += [sm_dict]

    sm_json = json.dumps(sm_dicts, cls=AlchemyEncoder)

    Session.remove()    
    return sm_json
web_server.py 文件源码 项目:shakecast 作者: usgs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_shakemap(shakemap_id):
    session = Session()
    sms = (session.query(ShakeMap)
                .filter(ShakeMap.shakemap_id == shakemap_id)
                .order_by(ShakeMap.shakemap_version.desc())
                .all())

    sm_dicts = []
    for sm in sms:
        sm_dict = sm.__dict__.copy()
        sm_dict.pop('_sa_instance_state', None)
        sm_dicts += [sm_dict]

    sm_json = json.dumps(sm_dicts, cls=AlchemyEncoder)

    Session.remove()    
    return sm_json
web_server.py 文件源码 项目:shakecast 作者: usgs 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def shakemap_overlay(shakemap_id):
    session = Session()
    shakemap = (session.query(ShakeMap)
                    .filter(ShakeMap.shakemap_id == shakemap_id)
                    .order_by(desc(ShakeMap.shakemap_version))
                    .limit(1)).first()
    if shakemap is not None:
        img = os.path.join(app.config['EARTHQUAKES'],
                           shakemap_id,
                           shakemap_id + '-' + str(shakemap.shakemap_version),
                           'ii_overlay.png')

    else:
        img = app.send_static_file('sc_logo.png')

    Session.remove()
    return send_file(img, mimetype='image/gif')
web_server.py 文件源码 项目:shakecast 作者: usgs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_notification(event_id):
    session = Session()
    event = session.query(Event).filter(Event.event_id == event_id).first()

    dicts = []
    if event is not None:
        nots = event.notifications
        for sm in event.shakemaps:
            nots += sm.notifications

        for obj in nots:
            dict_ = obj.__dict__.copy()
            dict_.pop('_sa_instance_state', None)
            dict_['group_name'] = obj.group.name
            dicts += [dict_]

    json_ = json.dumps(dicts, cls=AlchemyEncoder)
    Session.remove()    
    return json_
web_server.py 文件源码 项目:shakecast 作者: usgs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def notification_html(notification_type, name):
    config = json.loads(request.args.get('config', 'null'))

    session = Session()
    not_builder = NotificationBuilder()

    if notification_type == 'new_event':
        # get the two most recent events
        events = session.query(Event).all()
        events = events[-2:]
        html = not_builder.build_new_event_html(events=events, name=name, web=True, config=config)
    else:
        # get the most recent shakemap
        sms = session.query(ShakeMap).all()
        sm = sms[-1]
        html = not_builder.build_insp_html(sm, name=name, web=True, config=config)
    Session.remove()
    return html
web_server.py 文件源码 项目:shakecast 作者: usgs 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_group_info(group_id):
    session = Session()
    group = (session.query(Group)
                .filter(Group.shakecast_id == group_id)
                .first())

    if group is not None:
        group_specs = {'inspection': group.get_alert_levels(),
                        'new_event': group.get_min_mag(),
                        'heartbeat': group.has_spec('heartbeat'),
                        'scenario': group.get_scenario_alert_levels(),
                        'facilities': get_facility_info(group_name=group.name),
                        'users': group.users,
                        'template': group.template}

    specs_json = json.dumps(group_specs, cls=AlchemyEncoder)

    Session.remove()    
    return specs_json
auth_controller.py 文件源码 项目:go_basketball 作者: ZhaoPengkun 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def modify_user_info(args):
    email = args.get("email")

    user = db.session.query(User).filter_by(email=email).first()
    float_items = ["height", "weight", "bust", "Waist", "BMI"]
    int_items = ["vip", "step_number"]
    if user and ("email" in session) and session["email"] == user.email:
        for key in args:
            value = args[key]
            if value and key != "portrait":
                if key in float_items:
                    value = float(value)
                elif key in int_items:
                    value = int(value)
                user.__setattr__(key, value)
        db.session.commit()
        return {"result": "success", "message": "modify user info success"}
    else:
        return {"result": "error", "message": "please login"}
views.py 文件源码 项目:two_pic 作者: onlyonebowman 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def index():

    # Grab the choice variable passed from POST
    # for x in request.args['choice']:
    #   print('in args: {}'.format(x))

    # hook up database functions
    Session = sessionmaker()

    # locate the db location; currently hardcoded to development database
    engine = create_engine('sqlite:///' + os.path.join(basedir, 'data-dev.sqlite'))
    Session.configure(bind=engine)
    session = Session()

    post_count = session.query(func.count(Post.id)).scalar()  # count number of unique posts in the table
    pic1 = Post.query.get(randint(1, post_count))  # fails if there is 1 or less entries in the database
    pic2 = None
    while pic2 == pic1 or pic2 is None:  # Don't pick the same file
        pic2 = Post.query.get(randint(1, post_count))

    pic1_filename = url_for('static', filename=pic1.filename)
    pic2_filename = url_for('static', filename=pic2.filename)

    return render_template('index.html', pic1=pic1, pic2=pic2, pic1_filename=pic1_filename, pic2_filename=pic2_filename)
__init__.py 文件源码 项目:Java-Grader 作者: acronymcreations 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def check_for_user():
    cookie_value = request.cookies.get('user_id')
    if cookie_value:
        params = cookie_value.split('|')
        if hashlib.sha512(params[0] + hash_salt).hexdigest() == params[1]:
            user = session.query(User).filter(
                User.username == params[0]).first()
            if user:
                return user
__init__.py 文件源码 项目:Java-Grader 作者: acronymcreations 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():
    user = check_for_user()
    if user:
        print(user.f_name)
    assign = session.query(Assignment).order_by(desc(Assignment.created)).all()
    return render_template('main.html',
                           user=user,
                           assign=assign)
__init__.py 文件源码 项目:Java-Grader 作者: acronymcreations 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def assignResultsReview(user, post_id):
    post = session.query(Post).filter(post_id == Post.id).first()
    assign = session.query(Assignment).filter(
        Assignment.id == post.assignment_id).first()
    if post.user_id != user.id and not user.admin:
        return abort(403)
    if request.method == 'GET':
        return render_template('assignReviewResults.html',
                               assign=assign,
                               post=post,
                               user=user)
    else:
        session.delete(post)
        session.commit()
        return redirect(url_for('assignView', assign_id=assign.id))
__init__.py 文件源码 项目:Java-Grader 作者: acronymcreations 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def postFeedback(user, post_id):
    post = session.query(Post).filter(Post.id == post_id).first()
    # form_id = 'feedback_box_%s' % post_id
    notes = request.form['data']
    print('notes', notes)
    if post:
        post.notes = notes
        session.commit()
        return notes
    else:
        print('no post')
__init__.py 文件源码 项目:Java-Grader 作者: acronymcreations 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def allResults(user):
    assign = session.query(Assignment).order_by(desc(Assignment.created)).all()
    return render_template('allResults.html',
                           user=user,
                           assign=assign)
__init__.py 文件源码 项目:Java-Grader 作者: acronymcreations 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def editAssign(user, assign_id):
    assign = session.query(Assignment).filter(
        Assignment.id == assign_id).first()
    params = {}
    if request.method == 'GET':
        params['title'] = assign.name
        params['desc'] = assign.desc
        return render_template('admin.html',
                               user=user,
                               params=params)
    else:
        title = request.form['title']
        descrip = request.form['desc']
        assign_type = request.form['assign_type']
        include_tf = request.form.get('include_testfiles')
        if title and descrip:
            assign.name = title
            assign.desc = descrip
            assign.int_type = assign_type
            if include_tf:
                assign.include_tf = True
            else:
                assign.include_tf = False
            session.commit()
            return redirect(url_for('assignView', assign_id=assign_id))
        else:
            params['title'] = title
            params['desc'] = descrip
            params['error'] = 'Please fill in both fields before continuing.'
            return render_template('admin.html',
                                   user=user,
                                   params=params)
__init__.py 文件源码 项目:Java-Grader 作者: acronymcreations 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testView(user, test_id):
    test = session.query(Test).filter(Test.id == test_id).first()
    if request.method == 'GET':
        assign = session.query(Assignment).filter(
            Assignment.id == test.assignment_id).first()
        return render_template('testView.html',
                               test=test,
                               user=user,
                               assign=assign)
__init__.py 文件源码 项目:Java-Grader 作者: acronymcreations 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def resetPassword(user):
    if request.method == 'GET':
        return render_template('resetPass.html',
                               user=user)
    else:
        username = request.form['username']
        password = request.form['password']
        if not username or not password:
            status_message = 'Both fields are required.'
            return render_template('resetPass.html',
                                   status_message=status_message,
                                   user=user)
        user = session.query(User).filter(User.username == username).first()
        if not user:
            status_message = 'User could not be found. ' \
                'Please verify their username and try again.'
            return render_template('resetPass.html',
                                   status_message=status_message,
                                   user=user)
        salt = make_salt()
        user.salt = salt
        user.password = hashlib.sha512(password + salt).hexdigest()
        session.commit()
        status_message = 'Users password has been changed.'
        return render_template('resetPass.html',
                               status_message=status_message,
                               user=user)
__init__.py 文件源码 项目:Java-Grader 作者: acronymcreations 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def deleteUser(user, user_id):
    user = session.query(User).filter(User.id == user_id).first()
    posts = session.query(Post).filter(Post.user_id == user_id).all()
    if posts:
        for p in posts:
            session.delete(p)
            session.commit()
    if user:
        session.delete(user)
        session.commit()
    return redirect(url_for('roster'))
__init__.py 文件源码 项目:Java-Grader 作者: acronymcreations 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def all(user):
    users = session.query(User).all()
    assign = session.query(Assignment).all()
    posts = session.query(Post).all()
    tests = session.query(Test).all()

    return render_template('all.html',
                           users=users,
                           posts=posts,
                           assign=assign,
                           tests=tests,
                           user=user)
web_server.py 文件源码 项目:shakecast 作者: usgs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def load_user(user_id):
    session = Session()
    user = session.query(User).filter(User.shakecast_id==int(user_id)).first()

    # Expunge here might help with Windows threading
    #session.expunge(user)
    Session.remove()
    return user
web_server.py 文件源码 项目:shakecast 作者: usgs 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_shaking_events(facility_id):
    session = Session()
    fac = session.query(Facility).filter(Facility.shakecast_id == facility_id).first()
    eqs = [fs.shakemap.event for fs in fac.shaking_history if fs.shakemap is not None]

    eq_dicts = []
    for eq in eqs:
        if eq is not None:
            eq_dict = eq.__dict__.copy()
            eq_dict['shakemaps'] = len(eq.shakemaps)
            eq_dict.pop('_sa_instance_state', None)
            eq_dicts += [eq_dict]

    Session.remove()
    return jsonify(success=True, data=eq_dicts)
web_server.py 文件源码 项目:shakecast 作者: usgs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_shaking_data(facility_id, eq_id):
    session = Session()
    shaking = (session.query(FacilityShaking)
                    .filter(FacilityShaking
                                .shakemap
                                .has(ShakeMap.shakemap_id == eq_id))
                    .first())

    shaking_dict = None
    if shaking:
        shaking_dict = shaking.__dict__.copy()
        shaking_dict.pop('_sa_instance_state', None)

    Session.remove()
    return jsonify(success=True, data=shaking_dict)
web_server.py 文件源码 项目:shakecast 作者: usgs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_affected_facilities(shakemap_id):
    session = Session()
    sms = (session.query(ShakeMap)
                .filter(ShakeMap.shakemap_id == shakemap_id)
                .order_by(ShakeMap.shakemap_version.desc())
                .all())

    fac_dicts = []
    alert = {'gray': 0,
             'green': 0,
             'yellow': 0,
             'orange': 0,
             'red': 0}
    if sms:
        sm = sms[0]
        fac_shaking = sm.facility_shaking

        fac_dicts = [0] * len(sm.facility_shaking)
        i = 0
        for s in fac_shaking:
            fac_dict = s.facility.__dict__.copy()
            s_dict = s.__dict__.copy()
            fac_dict.pop('_sa_instance_state', None)
            s_dict.pop('_sa_instance_state', None)
            fac_dict['shaking'] = s_dict
            fac_dicts[i] = fac_dict
            i += 1

            # record number of facs at each alert level
            alert[fac_dict['shaking']['alert_level']] += 1

    shaking_data = {'alert': alert, 'facilities': fac_dicts, 'types': {}}

    shaking_json = json.dumps(shaking_data, cls=AlchemyEncoder)

    Session.remove()    
    return shaking_json
web_server.py 文件源码 项目:shakecast 作者: usgs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def event_image(event_id):
    session = Session()
    event = (session.query(Event)
                    .filter(Event.event_id == event_id)
                    .limit(1)).first()
    if event is not None:
        img = os.path.join(app.config['EARTHQUAKES'],
                           event_id,
                           'image.png')

    else:
        img = app.send_static_file('sc_logo.png')

    Session.remove()
    return send_file(img, mimetype='image/gif')
web_server.py 文件源码 项目:shakecast 作者: usgs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_user_groups(user_id):
    session = Session()
    user = session.query(User).filter(User.shakecast_id == user_id).first()

    groups = []
    if user is not None and user.groups:
        for group in user.groups:
            group_dict = group.__dict__.copy()
            group_dict.pop('_sa_instance_state', None)
            groups += [group_dict]

    groups_json = json.dumps(groups, cls=AlchemyEncoder)

    Session.remove()    
    return groups_json
gitimpl.py 文件源码 项目:upb-son-editor-backend 作者: CN-UPB 项目源码 文件源码 阅读 66 收藏 0 点赞 0 评论 0
def get_project(ws_id, pj_id: int, session=db_session()) -> Project:
    """
    Returns a project and raises 404, when project not found.

    :param ws_id: Workspace id
    :param pj_id: Project id
    :param session: db session
    :return: Project model
    """
    project = session.query(Project).join(Workspace) \
        .filter(Workspace.id == ws_id) \
        .filter(Project.id == pj_id).first()
    if not project:
        raise NotFound("Could not find project with id {}".format(pj_id))
    return project


问题


面经


文章

微信
公众号

扫码关注公众号