def create_user_db(auth_id, name, username, email='', verified=False, **props):
email = email.lower() if email else ''
if verified and email:
user_dbs, cursors = model.User.get_dbs(email=email, verified=True, limit=2)
if len(user_dbs) == 1:
user_db = user_dbs[0]
user_db.auth_ids.append(auth_id)
user_db.put()
task.new_user_notification(user_db)
return user_db
if isinstance(username, str):
username = username.decode('utf-8')
username = unidecode.unidecode(username.split('@')[0].lower()).strip()
username = re.sub(r'[\W_]+', '.', username)
new_username = username
n = 1
while not model.User.is_username_available(new_username):
new_username = '%s%d' % (username, n)
n += 1
user_db = model.User(
name=name,
email=email,
username=new_username,
auth_ids=[auth_id] if auth_id else [],
verified=verified,
token=util.uuid(),
**props
)
user_db.put()
task.new_user_notification(user_db)
return user_db
python类User()的实例源码
def get_user_db_from_email(email, password):
user_dbs, cursors = model.User.get_dbs(email=email, active=True, limit=2)
if not user_dbs:
return None
if len(user_dbs) > 1:
flask.flash('''We are sorry but it looks like there is a conflict with
your account. Our support team is already informed and we will get
back to you as soon as possible.''', category='danger')
task.email_conflict_notification(email)
return False
user_db = user_dbs[0]
if user_db.password_hash == util.password_hash(user_db, password):
return user_db
return None
def user_verify(token):
user_db = auth.current_user_db()
if user_db.token != token:
flask.flash('That link is either invalid or expired.', category='danger')
return flask.redirect(flask.url_for('profile'))
user_db.verified = True
user_db.token = util.uuid()
user_db.put()
flask.flash('Hooray! Your email is now verified.', category='success')
return flask.redirect(flask.url_for('profile'))
###############################################################################
# User Forgot
###############################################################################
def user_forgot(token=None):
if not config.CONFIG_DB.has_email_authentication:
flask.abort(418)
form = auth.form_with_recaptcha(UserForgotForm(obj=auth.current_user_db()))
if form.validate_on_submit():
cache.bump_auth_attempt()
email = form.email.data
user_dbs, cursors = util.get_dbs(
model.User.query(), email=email, active=True, limit=2,
)
count = len(user_dbs)
if count == 1:
task.reset_password_notification(user_dbs[0])
return flask.redirect(flask.url_for('welcome'))
elif count == 0:
form.email.errors.append('This email was not found')
elif count == 2:
task.email_conflict_notification(email)
form.email.errors.append(
'''We are sorry but it looks like there is a conflict with your
account. Our support team is already informed and we will get back to
you as soon as possible.'''
)
if form.errors:
cache.bump_auth_attempt()
return flask.render_template(
'user/user_forgot.html',
title='Forgot Password?',
html_class='user-forgot',
form=form,
)
###############################################################################
# User Reset
###############################################################################
def user_reset(token=None):
user_db = model.User.get_by('token', token)
if not user_db:
flask.flash('That link is either invalid or expired.', category='danger')
return flask.redirect(flask.url_for('welcome'))
if auth.is_logged_in():
login.logout_user()
return flask.redirect(flask.request.path)
form = UserResetForm()
if form.validate_on_submit():
user_db.password_hash = util.password_hash(user_db, form.new_password.data)
user_db.token = util.uuid()
user_db.verified = True
user_db.put()
flask.flash('Your password was changed succesfully.', category='success')
return auth.signin_user_db(user_db)
return flask.render_template(
'user/user_reset.html',
title='Reset Password',
html_class='user-reset',
form=form,
user_db=user_db,
)
###############################################################################
# User Activate
###############################################################################
def user_activate(token):
if auth.is_logged_in():
login.logout_user()
return flask.redirect(flask.request.path)
user_db = model.User.get_by('token', token)
if not user_db:
flask.flash('That link is either invalid or expired.', category='danger')
return flask.redirect(flask.url_for('welcome'))
form = UserActivateForm(obj=user_db)
if form.validate_on_submit():
form.populate_obj(user_db)
user_db.password_hash = util.password_hash(user_db, form.password.data)
user_db.token = util.uuid()
user_db.verified = True
user_db.put()
return auth.signin_user_db(user_db)
return flask.render_template(
'user/user_activate.html',
title='Activate Account',
html_class='user-activate',
user_db=user_db,
form=form,
)
###############################################################################
# User Merge
###############################################################################
def signup():
next_url = util.get_next_url()
form = None
if config.CONFIG_DB.has_email_authentication:
form = form_with_recaptcha(SignUpForm())
save_request_params()
if form.validate_on_submit():
user_db = model.User.get_by('email', form.email.data)
if user_db:
form.email.errors.append('This email is already taken.')
if not form.errors:
user_db = create_user_db(
None,
util.create_name_from_email(form.email.data),
form.email.data,
form.email.data,
)
user_db.put()
task.activate_user_notification(user_db)
cache.bump_auth_attempt()
return flask.redirect(flask.url_for('welcome'))
if form and form.errors:
cache.bump_auth_attempt()
title = 'Sign up' if config.CONFIG_DB.has_email_authentication else 'Sign in'
return flask.render_template(
'auth/auth.html',
title=title,
html_class='auth',
next_url=next_url,
form=form,
**urls_for_oauth(next_url)
)
###############################################################################
# Sign out stuff
###############################################################################
def form_with_recaptcha(form):
should_have_recaptcha = cache.get_auth_attempt() >= config.RECAPTCHA_LIMIT
if not (should_have_recaptcha and config.CONFIG_DB.has_recaptcha):
del form.recaptcha
return form
###############################################################################
# User related stuff
###############################################################################
def create_user_db(auth_id, name, username, email='', verified=False, **props):
email = email.lower() if email else ''
if verified and email:
user_dbs, cursors = model.User.get_dbs(email=email, verified=True, limit=2)
if len(user_dbs) == 1:
user_db = user_dbs[0]
user_db.auth_ids.append(auth_id)
user_db.put()
task.new_user_notification(user_db)
return user_db
if isinstance(username, str):
username = username.decode('utf-8')
username = unidecode.unidecode(username.split('@')[0].lower()).strip()
username = re.sub(r'[\W_]+', '.', username)
new_username = username
n = 1
while not model.User.is_username_available(new_username):
new_username = '%s%d' % (username, n)
n += 1
user_db = model.User(
name=name,
email=email,
username=new_username,
auth_ids=[auth_id] if auth_id else [],
verified=verified,
token=util.uuid(),
**props
)
user_db.put()
task.new_user_notification(user_db)
return user_db
def get_user_db_from_email(email, password):
user_dbs, cursors = model.User.get_dbs(email=email, active=True, limit=2)
if not user_dbs:
return None
if len(user_dbs) > 1:
flask.flash('''We are sorry but it looks like there is a conflict with
your account. Our support team has been informed and we will get
back to you as soon as possible.''', category='danger')
task.email_conflict_notification(email)
return False
user_db = user_dbs[0]
if user_db.password_hash == util.password_hash(user_db, password):
return user_db
return None
def user_verify(token):
user_db = auth.current_user_db()
if user_db.token != token:
flask.flash('That link is either invalid or expired.', category='danger')
return flask.redirect(flask.url_for('profile'))
user_db.verified = True
user_db.token = util.uuid()
user_db.put()
flask.flash('Hooray! Your email is now verified.', category='success')
return flask.redirect(flask.url_for('profile'))
###############################################################################
# User Forgot
###############################################################################
def user_forgot(token=None):
if not config.CONFIG_DB.has_email_authentication:
flask.abort(418)
form = auth.form_with_recaptcha(UserForgotForm(obj=auth.current_user_db()))
if form.validate_on_submit():
cache.bump_auth_attempt()
email = form.email.data
user_dbs, cursors = util.get_dbs(
model.User.query(), email=email, active=True, limit=2,
)
count = len(user_dbs)
if count == 1:
task.reset_password_notification(user_dbs[0])
return flask.redirect(flask.url_for('welcome'))
elif count == 0:
form.email.errors.append('This email was not found')
elif count == 2:
task.email_conflict_notification(email)
form.email.errors.append(
'''We are sorry but it looks like there is a conflict with your
account. Our support team is already informed and we will get back to
you as soon as possible.'''
)
if form.errors:
cache.bump_auth_attempt()
return flask.render_template(
'user/user_forgot.html',
title='Forgot Password?',
html_class='user-forgot',
form=form,
)
###############################################################################
# User Reset
###############################################################################
def user_reset(token=None):
user_db = model.User.get_by('token', token)
if not user_db:
flask.flash('That link is either invalid or expired.', category='danger')
return flask.redirect(flask.url_for('welcome'))
if auth.is_logged_in():
flask_login.logout_user()
return flask.redirect(flask.request.path)
form = UserResetForm()
if form.validate_on_submit():
user_db.password_hash = util.password_hash(user_db, form.new_password.data)
user_db.token = util.uuid()
user_db.verified = True
user_db.put()
flask.flash('Your password was changed succesfully.', category='success')
return auth.signin_user_db(user_db)
return flask.render_template(
'user/user_reset.html',
title='Reset Password',
html_class='user-reset',
form=form,
user_db=user_db,
)
###############################################################################
# User Activate
###############################################################################
def user_activate(token):
if auth.is_logged_in():
flask_login.logout_user()
return flask.redirect(flask.request.path)
user_db = model.User.get_by('token', token)
if not user_db:
flask.flash('That link is either invalid or expired.', category='danger')
return flask.redirect(flask.url_for('welcome'))
form = UserActivateForm(obj=user_db)
if form.validate_on_submit():
form.populate_obj(user_db)
user_db.password_hash = util.password_hash(user_db, form.password.data)
user_db.token = util.uuid()
user_db.verified = True
user_db.put()
return auth.signin_user_db(user_db)
return flask.render_template(
'user/user_activate.html',
title='Activate Account',
html_class='user-activate',
user_db=user_db,
form=form,
)
###############################################################################
# User Merge
###############################################################################
def signup_processed():
"""Processes new users."""
user_id = request.form["user_id"]
phone_number = request.form["phone_number"]
first_name = request.form["first_name"]
last_name = request.form["last_name"]
email = request.form["email"]
password = request.form["password"]
# checking to see if the email already exists in the db. If not, a new account is created.
user = User.query.filter_by(email=email).first()
if user:
flash("Oops, your email already exists! Please log in.")
return redirect("/login")
else:
new_user = User(user_id=user_id,
first_name=first_name,
last_name=last_name,
email=email,
password=password)
# used in test to update phone number
Phone.query.filter_by(id=user_id).update({"phone": phone_number})
db.session.add(new_user)
db.session.commit()
session['id'] = new_user.user_id
user = get_specific_user(email)
flash("Welcome to Fork&Spoon, %s. You have successfully logged in." % user.first_name)
return render_template("welcomepage.html")
def profile(id):
"""Displays/saves user's profile"""
user = User.query.filter_by(user_id=session['id']).first()
return render_template("profile.html", user=user)
def edit_profile():
"""Displays/saves user's profile"""
description = request.form.get("description")
User.query.filter_by(user_id=session['id']).update({"description": description})
db.session.commit()
return "You have successfully updated your profile."
def event_confirmed():
"""Confirmation page after creating an event."""
# instantiating event and single attendees into our tables
date = request.form['date']
start_time = request.form['start_time']
end_time = request.form['end_time']
date_start_time = date + " " + start_time
date_end_time = date + " " + end_time
start_datetime = datetime.strptime(date_start_time, "%m/%d/%Y %H:%M")
end_datetime = datetime.strptime(date_end_time, "%m/%d/%Y %H:%M")
business_url = request.form['business_url']
business = get_specific_business(business_url)
# getting category id and business id to instantiate event
category_id = request.form['category_id']
business_id = business.id
# grabbing the current user info to instantiate our event.
user = User.query.filter_by(user_id=session['id']).first()
event = Event(start_time=start_datetime, end_time=end_datetime, category_id=category_id, business_id=business_id, user_id=user.user_id)
db.session.add(event)
db.session.commit()
event = get_specific_event(business_id)
event_id = event.id
# instantiating an attendee row so that it shows the creater is the owner/is attending. If there is a match, we can query through it later and two rows will show up by filtering the specific event_id. If not, only one attendee will appear and show the event is not matched.
attendee = Attendee(user_id=user.user_id, event_id=event_id, is_owner=True)
db.session.add(attendee)
db.session.commit()
return render_template("confirmation.html", event=event)
def available_events():
"""Displaying all events that are available, not including the current user's created events."""
pacific = timezone('US/Pacific')
time_now = datetime.now(tz=pacific)
user = User.query.filter_by(user_id=session['id']).first()
# The past is 'less' than the present, and we want to show all future (greater) events.
events = Event.query.filter(Event.is_matched == False, Event.user_id != user.user_id, Event.end_time > time_now).all()
return render_template("find_events.html", events=events)