def post(self):
'''Endpoint for getting auth token for existing user'''
parser = reqparse.RequestParser()
parser.add_argument('email', required=True)
parser.add_argument('password', required=True)
args = parser.parse_args()
hash_obj = hashlib.sha256()
hash_obj.update(args.password.encode('utf-8'))
password_hash = hash_obj.hexdigest()
user = db.session.query(User).filter((User.email==args.email) & (User.password_hash==password_hash)).first()
if user is None:
return redirect('/?message=%s' % 'Could not find account.')
# allocate and maintain session token
token = os.urandom(256)
tokens[token] = args.email
session['token'] = token
session['username'] = user.username
return redirect('/')
python类username()的实例源码
def login():
req = request.get_json()
if 'username' not in req or 'password' not in req:
return jsonify({'error': 'Bad request'}), 400
username = req['username']
password = req['password']
user = User.query.filter(User.username == username).first()
if user and get_hash(password, user.salt) == user.password:
token = uuid.uuid4().hex
user.token = token
db_session.query(User).filter_by(id=user.id) \
.update({"token": user.token})
db_session.commit()
resp = jsonify({'success': True})
resp.set_cookie('token', token)
return resp
return jsonify({'error': 'Bad login'}), 401
def register():
req = request.get_json()
if 'username' not in req or 'password' not in req \
or 'invite_code' not in req:
return jsonify({'error': 'Bad request'}), 400
username = req['username']
password = req['password']
invite_code = req['invite_code']
if not verify_invite_code(invite_code):
return jsonify({'error': 'Invalid invite code'}), 400
new_user(username, password)
Invite.query.filter(Invite.code == invite_code).first().redeemed = True
db_session.commit()
return jsonify({'success': True})
def add_exemption():
# adds an exemption of restrictions for a user
username = request.form["username"]
if not username:
return bad_request("username not provided")
user = User.query.filter(func.lower(username) ==
func.lower(username)).first()
if user:
user.is_exempt = True
if len(user.response) > 0:
user.response = ""
db.session.add(user)
db.session.commit()
return jsonify(status="OK"), 200
else:
return bad_request("User does not exist yet")
def test_atomic_with_delete(flushdb):
for i in range(3):
await User.create(username=f'u{i}')
async with db.atomic():
user = await User.get(User.username == 'u1')
await user.delete_instance()
usernames = [u.username async for u in User.select()]
assert sorted(usernames) == ['u0', 'u2']
async with db.atomic():
async with db.atomic():
user = await User.get(User.username == 'u2')
await user.delete_instance()
usernames = [u.username async for u in User.select()]
assert usernames == ['u0']
def create():
create_form = forms.CreateForm(request.form)
if request.method == 'POST' and create_form.validate():
user = User(create_form.username.data,
create_form.email.data,
create_form.password.data)
db.session.add(user)
db.session.commit()
@copy_current_request_context
def send_message(email, username):
send_email(email, username)
sender = threading.Thread(name='mail_sender',
target = send_message,
args = (user.email, user.username))
sender.start()
success_message = 'Usuario registrado en la base de datos!'
flash(success_message)
return render_template('create.html', form = create_form)
def post(self):
'''Endpoint for registering and getting auth token for new user'''
parser = reqparse.RequestParser()
parser.add_argument('username', required=True)
parser.add_argument('email', required=True)
parser.add_argument('password', required=True)
args = parser.parse_args()
# check username and email uniqueness
duplicate = db.session.query(User).filter((User.username==args.username) | (User.email==args.email)).first()
if duplicate is not None:
return redirect('/?message=%s' % 'Account already exists for this email.')
hash_obj = hashlib.sha256()
hash_obj.update(args.password.encode('utf-8'))
user_obj = User(
username=args.username,
email=args.email,
password_hash=hash_obj.hexdigest()
)
db.session.add(user_obj)
db.session.commit()
# allocate and maintain session token
token = os.urandom(256)
tokens[token] = args.email
session['token'] = token
session['username'] = args.username
return redirect('/')
def name_exists(form, field):
if User.select().where(User.username == field.data).exists():
raise ValidationError('User with this name already exists.')
def new_user(username, password):
salt = app.config['SALT']
hashpass = get_hash(password, salt)
user = User(username, hashpass, salt)
db_session.add(user)
db_session.commit()
return user
def is_valid(self):
if len(self.form['username']) < 4:
return self.error('username must be at least 4 characters')
if self.form['username'] == 'anonymous':
return self.error('username cannot be anonymous')
if len(self.form['password']) < 8:
return self.error('password must be at least 8 characters')
if User.select().where(User.username == self.form['username']).count():
return self.error('username is already in use')
if self.form['email']:
if not validate_email(self.form['email']):
return self.error('email is not valid')
if User.select().where(User.email == self.form['email']).count():
return self.error('email is already in use')
return super(RegistrationValidator, self).is_valid()
def user_lookup(username):
# shows a user's page
is_json = False
if username.endswith(".json"):
username = username.split(".")[0]
is_json = True
user = User.query.filter_by(username=username).first()
if not user:
# check to see if a similar username exists
user = User.query.filter(User.username.ilike(username)).first()
show_warning = True
if user.username.lower() == username.lower():
show_warning = False
if not user:
return abort(404)
if is_json:
return jsonify(username=user.username,
response_md=user.full_body_md,
response_html=user.full_body_html,
submitted=user.submitted,
processed=user.processed,
last_login=user.last_login)
return render_template(
"user.html",
user=user,
username=username,
show_warning=show_warning)
def add_mod():
# adds a user to the list of moderators
username = request.form["username"]
user = User.query.filter(func.lower(User.username)
== func.lower(username)).first()
if not user:
return bad_request("user not found")
user.form_mod = True
run = True
while run:
# issue key
key = "".join(
random.choices(
string.ascii_letters +
string.digits,
k=32))
user.api_key = key
try:
db.session.add(user)
db.session.commit()
run = False
except IntegrityError: # check for uniqueness
continue
db.session.add(user)
db.session.commit()
url = url_for('settings', _external=True)
subj = f"invitation to moderate {g.settings.site_title}"
body = f"**gadzooks!** u/{g.user.username} has added you as a moderator of {g.settings.site_title}"
body += f"\n\nclick [here]({url}) to view the site. mod tools will be visible at the top of the page."
send_message(username, subj, body)
return jsonify(status="OK"), 200
def remove_mod():
# removes a user from the list of moderators
username = request.form["username"]
user = User.query.filter(func.lower(User.username)
== func.lower(username)).first()
if not user:
return bad_request("user not found")
user.form_mod = False
db.session.add(user)
db.session.commit()
return jsonify(status="OK"), 200
def name_exists(form, field):
if User.select().where(User.username == field.data).exists():
raise ValidationError('User with that name already exists.')
def addUser(username, password, authlevel):
added = db_session.add(User(username = username, password = password, authlevel = authlevel))
db_session.commit()
return added
def addUsers(filename):
users = []
with open(filename) as csvfile:
reader = csv.reader(csvfile)
for row in reader:
users.append( User(username = row[0], password = row[1], authlevel = row[2]) )
added = db_session.add_all(users)
db_session.commit()
return added
def deleteUser(username):
user = db_session.query(User).filter(User.username == username).first()
aqs = db_session.query(ArticleQueue).filter(ArticleQueue.coder_id == user.id).delete()
user_r = db_session.delete(user)
db_session.commit()
if user_r:
print("User %s deleted, %d article queue items deleted." % (username, aqs) )
return aqs
########
### Generating samples
########
def name_exists(form, field):
if User.select().where(User.username == field.data).exists():
raise ValidationError('User with that name already exists.')
def send_email(user_email, username):
msg = Message('Gracias por tu registro!',
sender = app.config['MAIL_USERNAME'],
recipients = [user_email] )
msg.html = render_template('email.html', username = username)
mail.send(msg)
def create_session(username = '', user_id = ''):
session['username'] = username
session['user_id'] = use_id
def before_request():
if 'username' not in session and request.endpoint in ['comment']:
return redirect(url_for('login'))
elif 'username' in session and request.endpoint in ['login', 'create']:
return redirect(url_for('index'))
def index():
if 'username' in session:
username = session['username']
title = 'Index'
return render_template('index.html', title = title)
def logout():
if 'username' in session:
session.pop('username')
return redirect(url_for('login'))
def reviews(page = 1):
per_page = 3
comments = Comment.query.join(User).add_columns(
User.username,
Comment.text,
Comment.created_date
).paginate(page,per_page,False)
return render_template('reviews.html', comments = comments, date_format = date_format)
def ajax_login():
print request.form
username = request.form['username']
response = { 'status': 200, 'username': username, 'id': 1 }
return json.dumps(response)
def post(self, task_id):
''' Endpoint for submitting code '''
parser = reqparse.RequestParser()
parser.add_argument('language', required=True)
parser.add_argument('code', required=True)
args = parser.parse_args()
task = manager.get_task(task_id)
if task_id is None or task is None: # no task exists with the stated id
return redirect('/?message=%s' % 'task does not exist.')
test_cases = task.get('test_cases', None)
# get user id
if 'token' not in session:
try:
session.pop('username')
except KeyError: pass
return redirect('/?message=%s' % 'no auth token supplied.')
if session['token'] not in tokens:
try:
session.pop('username')
except KeyError: pass
session.pop('token')
return redirect('/?message=%s' % 'invalid token supplied.')
user = db.session.query(User).filter(User.email==tokens[session['token']]).first()
# verify response using same input
result = docker_verify(args.code, args.language, test_cases)
if result is None:
return redirect('/?task=%d&message=%s' % (task_id, 'language is not supported'))
if all(result):
if(len(args.code) < best_answer[task_id][args.language]):
best_answer[task_id][args.language] = len(args.code)
max_points = int(task.get('points', None))
points = max_points * best_answer[task_id][args.language]/len(args.code)
# look for better answer from same user
prev_answer = db.session.query(Answer).filter((Answer.user_id==user.id) & (Answer.task_id==task_id) & (Answer.points>points)).first()
if prev_answer is None:
answer = Answer(
user_id=user.id,
task_id=task_id,
length=len(args.code),
points=points,
language=args.language
)
db.session.add(answer)
user.points = sum([answer.points for answer in user.answers])
db.session.commit()
return redirect('/?task=%d&message=%s' % (task_id, 'answer judged correct.'))
return redirect('/?task=%d&message=%s' % (task_id, 'answer judged incorrect.'))
def users():
# manage users
page = int(request.args.get('page', 1))
count = int(request.args.get('limit', 25))
None if page == 1 else page - 1
if page == 1:
button_back = False
else:
button_back = f"/mod/users?page={page-1}&limit={count}"
button_next = f"/mod/users?page={page+1}&limit={count}"
if request.args.get('user') is not None:
username = request.args.get('user')
users = User.query.filter(
func.lower(
User.username) == func.lower(username)).first()
return render_template("users.html", users=[users])
elif request.args.get('mod') is not None:
raw = str(request.args.get('mod')).lower()
if raw == "false":
mod = False
else:
mod = True
users = User.query.filter_by(
form_mod=mod).paginate(
page, count, False).items
if button_back:
button_back += f"&mod={raw}"
button_next += f"&mod={raw}"
elif request.args.get('exempt') is not None:
raw = str(request.args.get('exempt')).lower()
if raw == "false":
exempt = False
else:
exempt = True
users = User.query.filter_by(
is_exempt=exempt).paginate(
page, count, False).items
if button_back:
button_back += f"&exempt={raw}"
button_next += f"&exempt={raw}"
else:
users = User.query.paginate(page, count, False).items
button_data = [button_back, button_next]
return render_template("users.html", users=users, button_data=button_data)