def pull_user_data(session):
print 'pulling users'
user_data = requests.get(u"{}{}".format(config.prod_url, 'export/users'))
loaded_data = json.loads(user_data.text)
for user_dict in loaded_data:
user = User(
id=user_dict['id'],
name=user_dict['name'],
email=user_dict['email'],
admin=user_dict['admin'],
avatar=user_dict['avatar'],
active=user_dict['active'],
created_at=user_dict['created_at'],
elo=user_dict['elo'],
wins=user_dict['wins'],
losses=user_dict['losses']
)
session.add(user)
session.commit()
print 'done pulling users'
python类User()的实例源码
def adduser():
"""add user"""
from getpass import getpass
username = raw_input("\_username: ")
email = raw_input("\_email: ")
role_id = raw_input("\_[1:moderator 2:admin 3:user]: ")
password = getpass("\_password: ")
u = User(
email = email,
username = username,
password = password,
role_id = role_id
)
db.session.add(u)
db.session.commit()
print "<user %s add in database>" % username
def validate(self):
#check for old pw hash and upadte password if needed
self.user = db.session.query(models.User).filter(models.User.email == self.email.data).first()
if self.user and self.user.password.startswith("pbkdf2:sha1"):
if check_password_hash(self.user.password, self.password.data):
self.user.password = encrypt_password(self.password.data)
self.user.active = 1
self.user.roles.append(db.session.query(models.Role).filter(models.Role.name=="admin").first())
db.session.commit()
return True
#do the flask-security checks
if not super(Login, self).validate():
return False
return True
def password_reset_request():
"""Request for reseting password when user forget his/her password.
"""
if not current_user.is_anonymous:
return redirect(url_for('.index'))
form = PasswordResetRequestForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user:
token = user.generate_reset_token()
send_email(user.email, 'Reset Your Password',
'auth/email/reset_password',
user=user, token=token,
next=request.args.get('next'))
flash('An email with instructions to reset your password has been '
'sent to you.')
return redirect(url_for('auth.login'))
return render_template('auth/reset_password.html', form=form)
def password_reset(token):
"""Reset password using token.
"""
if not current_user.is_anonymous:
return redirect(url_for('.index'))
form = PasswordResetForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data).first()
if user is None:
return redirect(url_for('.index'))
if user.reset_password(token, form.password.data):
flash('Your password has been updated.')
return redirect(url_for('auth.login'))
else:
return redirect(url_for('.index'))
return render_template('auth/reset_password.html', form=form)
def adduser():
"""add user"""
from getpass import getpass
username = raw_input("\_username: ")
email = raw_input("\_email: ")
role_id = raw_input("\_[1:moderator 2:admin 3:user]: ")
password = getpass("\_password: ")
u = User(
email = email,
username = username,
password = password,
role_id = role_id
)
db.session.add(u)
db.session.commit()
print "<user %s add in database>" % username
def adduser():
"""add user"""
from getpass import getpass
username = raw_input("\_username: ")
email = raw_input("\_email: ")
role_id = raw_input("\_[1:moderator 2:admin 3:user]: ")
password = getpass("\_password: ")
u = User(
email = email,
username = username,
password = password,
role_id = role_id
)
db.session.add(u)
db.session.commit()
print "<user %s add in database>" % username
def login():
form=logi()
if form.validate_on_submit():
username=form.username.data
password=form.password.data
try:
me=User.query.filter_by(user=username).first()
if me.password==password:
session['username']=username
return redirect(url_for('index'))
else:
flash('??????????')
return render_template('login.html',form=form)
except:
flash('??????')
return render_template('login.html',form=form)
return render_template('login.html',form=form)
def reg():
form=regs()
if form.validate_on_submit():
username=form.username.data
password=form.password.data
querenpassword=form.queren_pass.data
email=form.email.data
print(username,password)
if password !=querenpassword:
flash('???????????')
return render_template('reg.html',form=form)
else:
user=User.query.filter_by(user=username).first()
if user:
flash('???????')
return render_template('reg.html',form=form)
emai=User.query.filter_by(email=email).first()
if emai:
flash('??????')
return render_template('reg.html',form=form)
new_user=User(user=username,email=email,password=password)
db.session.add(new_user)
db.session.commit()
return redirect('login')
return render_template('reg.html',form=form)
def adduser():
"""add user"""
from getpass import getpass
name = raw_input("\_username: ")
email = raw_input("\_email: ")
role_id = raw_input("\_[1:moderator 2:admin 3:user]: ")
password = getpass("\_password: ")
u = User(
email = email,
name = name,
password = password,
role_id = role_id
)
db.session.add(u)
db.session.commit()
print ("<user %s add in database>" % name)
def user_login(request):
#POST
if request.method == 'POST':
username = request.POST.get('username')
password = request.POST.get('pass')
user = authenticate(username=username, password = password)
if user:
if user.is_active:
login(request, user)
if is_member(user)==True:
return HttpResponseRedirect('/app/performer_profile/')
return HttpResponseRedirect('/app/')
else:
return HttpResponse('Your Gigstop account is disabled')
else:
print "Invalid login details supplied {0}, {1}".format(username, password)
return HttpResponse('Invalid login details supplied')
#GET
else:
return render(request, 'Gigstop/login.html', {})
#checks to see if a User is a band or not
def buy_tickets(request, event_id):
event = Event.objects.get(id=event_id)
if request.method == 'POST':
buy_form = PurchaseTicketForm(data=request.POST)
if buy_form.is_valid():
ticket = buy_form.save(commit=False)
ticket.event = event
ticket.price = event.price
ticket.user = User.objects.get(username=request.user.username)
ticket.save()
decrement = ticket.quantity
no_tickets = event.no_tickets
newTickets = Event.objects.filter(id=event_id).update(no_tickets=no_tickets-decrement)
return HttpResponseRedirect('/app/thanks/')
else:
print buy_form.errors
else:
buy_form = PurchaseTicketForm()
return render(request, 'Gigstop/buy_tickets.html', {'event': event, 'buy_form': buy_form})
def authorized(resp):
if resp is None:
return 'Access denied: reason=%s error=%s' % (
request.args['error'], request.args['error_description'])
session['google_token'] = (resp['access_token'], resp['id_token'])
res = google.get('https://www.googleapis.com/plus/v1/people/me')
google_object = res.data
if len(google_object['emails']) > 0:
session['user_email'] = (google_object['emails'][0]['value'])
sql_session.rollback()
email_in_db = sql_session.query(models.User).filter_by(
email=session['user_email']).first()
if not email_in_db:
new_user = models.User(
verification_level=100,
email=session['user_email'])
sql_session.add(new_user)
sql_session.commit()
new_user = sql_session.query(models.User).filter_by(
email=session['user_email']).first()
session['session_user_id'] = new_user.id
else:
session['session_user_id'] = email_in_db.id
return redirect('/play')
def patch(self, org_id, location_id, user_id):
organization = Organization.query.get_or_404(org_id)
user = User.query.get_or_404(user_id)
if not user.is_location_manager(location_id):
return {"message": "user does not exist or is not a manager"}, 404
parser = reqparse.RequestParser()
parser.add_argument("activateReminder", type=inputs.boolean)
changes = parser.parse_args(strict=True)
# Filter out null values
changes = dict((k, v) for k, v in changes.iteritems() if v is not None)
if len(changes) == 0:
return {}, 204
if "activateReminder" in changes:
if user.active:
return {"message": "This user is already active"}, 400
user.send_activation_reminder(user, organization.name)
return {}, 204
def registry():
if request.method == 'POST':
user_id = Standardlib.create_uuid()
username = request.form.get('username',None)
password = request.form.get('password', None)
email = request.form.get('email', None)
try:
newuser = models.User(uuid = user_id, username = username,
password = generate_password_hash(password),
email = email, timestamp = datetime.utcnow())
db.session.add(newuser)
db.session.commit()
return jsonify(result="sucessfull")
except:
return jsonify(result = "failed")
finally:
db.session.close()
return redirect('login')
def change_passwd():
if session.get('login_in',None):
if session.get('username',None):
oldpassword = request.values['oldpassword']
newpassword = request.values['newpassword']
try:
user = models.User.query.filter_by(username = session['username']).first()
if check_password_hash(user.password, oldpassword):
user.password = generate_password_hash(newpassword)
db.session.add(user)
db.session.commit()
return jsonify(result="change sucessfull")
else:
return jsonify(result="change failed")
except:
db.session.rollback()
return jsonify(result="change failed")
finally:
db.session.close()
else:
return redirect('/login')
else:
return redirect('/login')
def password_reset_request():
if not current_user.is_anonymous:
return redirect(url_for('main.index'))
form = PasswordResetRequestForm()
if form.validate_on_submit():
user = User.query.filter_by(
email=form.email.data.lower().strip()).first()
if user:
token = user.generate_reset_token()
send_email(user.email, 'Reset Your Password',
'auth/email/reset_password',
user=user, token=token,
next=request.args.get('next'))
flash('An email with instructions to reset your password has been '
'sent to you.')
return redirect(url_for('auth.login'))
return render_template('auth/reset_password.html', form=form)
def load_user(user_id):
session = db.session
return session.query(User).get(user_id)
def dev_login(user_id):
if ENVIRONMENT == 'dev':
login_user(db.session.query(User).get(user_id))
return redirect(url_for('index'))
def api_get_items_v2(table, *, page='1', size='10'):
models = {'users': User, 'blogs': Blog, 'comments': Comment}
num = await models[table].countRows()
page = Page(num, set_valid_value(page), set_valid_value(size, 10))
if num == 0:
return dict(page=page, items=[])
items = await models[table].findAll(orderBy='created_at desc', limit=(page.offset, page.limit + num % page.limit))
return dict(page=page, items=[item.to_json(encrypted=True) for item in items])
# ??????
def api_delete_item_v2(table, id, request):
models = {'user': User, 'blog': Blog, 'comment': Comment}
check_user(request.__user__)
item = await models[table].find(id)
if item:
await item.remove()
else:
logging.warn('id: %s not exist in %s' % (id, table))
return dict(id=id)
# ???????????
def make_shell_context():
return dict(app = app, db = db, Article = Article, User = User, Site = Site, Packet = Packet)
def login():
login_form = LoginForm()
if login_form.validate_on_submit():
the_user = User.query.filter(User.email.ilike(login_form.email.data)).first()
if the_user is not None and the_user.verify_password(login_form.password.data):
login_user(the_user, login_form.remember_me.data)
flash(u'????! ??? %s!' % the_user.name, 'success')
return redirect(request.args.get('next') or url_for('main.index'))
flash(u'??????????!', 'danger')
return render_template("login.html", form=login_form, title=u"??")
def register():
form = RegistrationForm()
if form.validate_on_submit():
the_user = User(email=form.email.data,
name=form.name.data,
password=form.password.data)
db.session.add(the_user)
db.session.commit()
flash(u'????! ??? %s!' % form.name.data, 'success')
login_user(the_user)
return redirect(request.args.get('next') or url_for('main.index'))
return render_template('register.html', form=form, title=u"?????")
test_user_model.py 文件源码
项目:circleci-demo-python-flask
作者: CircleCI-Public
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_password_setter(self):
u = User(password='cat')
self.assertTrue(u.password_hash is not None)
test_user_model.py 文件源码
项目:circleci-demo-python-flask
作者: CircleCI-Public
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_password_verification(self):
u = User(password='cat')
self.assertTrue(u.verify_password('cat'))
self.assertFalse(u.verify_password('dog'))
test_user_model.py 文件源码
项目:circleci-demo-python-flask
作者: CircleCI-Public
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_password_salts_are_random(self):
u = User(password='cat')
u2 = User(password='cat')
self.assertTrue(u.password_hash != u2.password_hash)
test_user_model.py 文件源码
项目:circleci-demo-python-flask
作者: CircleCI-Public
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_valid_confirmation_token(self):
u = User(password='cat')
db.session.add(u)
db.session.commit()
token = u.generate_confirmation_token()
self.assertTrue(u.confirm(token))
test_user_model.py 文件源码
项目:circleci-demo-python-flask
作者: CircleCI-Public
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_invalid_confirmation_token(self):
u1 = User(password='cat')
u2 = User(password='dog')
db.session.add(u1)
db.session.add(u2)
db.session.commit()
token = u1.generate_confirmation_token()
self.assertFalse(u2.confirm(token))
test_user_model.py 文件源码
项目:circleci-demo-python-flask
作者: CircleCI-Public
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_expired_confirmation_token(self):
u = User(password='cat')
db.session.add(u)
db.session.commit()
token = u.generate_confirmation_token(1)
time.sleep(2)
self.assertFalse(u.confirm(token))