def load_user_from_request(request):
"""
User login validation logic. If the user exists with the given username
and with the correct password then returns the user, otherwise None.
Args:
request (Request): The flask request object used in endpoint handlers
Returns:
models.User: The logged user, None if login fails
"""
if not request.authorization:
return None
try:
user = User.get(User.email == request.authorization['username'])
except User.DoesNotExist:
return None
if user.verify_password(request.authorization['password']):
return user
return None
python类get()的实例源码
def post(self):
request_data = request.get_json(force=True)
if 'email' not in request_data or 'password' not in request_data:
abort(client.BAD_REQUEST)
email = request_data['email']
password = request_data['password']
try:
user = User.get(User.email == email)
except User.DoesNotExist:
abort(client.BAD_REQUEST)
if not user.verify_password(password):
abort(client.UNAUTHORIZED)
login_user(user)
return generate_response({}, client.OK)
def delete(self, order_uuid):
""" Delete a specific order. """
try:
obj = Order.get(uuid=str(order_uuid))
except Order.DoesNotExist:
return None, NOT_FOUND
# get the user from the flask.g global object registered inside the
# auth.py::verify() function, called by @auth.login_required decorator
# and match it against the found user.
# This is to prevent users from deleting other users' account.
if auth.current_user != obj.user and auth.current_user.admin is False:
return ({'message': "You can't delete another user's order"},
UNAUTHORIZED)
obj.delete_instance(recursive=True)
return None, NO_CONTENT
def parse_signed_cookie(cookie_str):
try:
L = cookie_str.split('-')
if len(L) != 3:
return None
id, expires, md5 = L
if int(expires) < time.time():
return None
user = User.get(id)
if user is None:
return None
if md5 != hashlib.md5('%s-%s-%s-%s' % (id, user.password, expires, _COOKIE_KEY)).hexdigest():
return None
return user
except:
return None
def api_update_blog(blog_id):
check_admin()
i = ctx.request.input(name='', summary='', content='')
name = i.name.strip()
summary = i.summary.strip()
content = i.content.strip()
if not name:
raise APIValueError('name', 'name cannot be empty.')
if not summary:
raise APIValueError('summary', 'summary cannot be empty.')
if not content:
raise APIValueError('content', 'content cannot be empty.')
blog = Blog.get(blog_id)
if blog is None:
raise APIResourceNotFoundError('Blog')
blog.name = name
blog.summary = summary
blog.content = content
blog.update()
return blog
def login():
next_ = request.values.get('next')
if request.method == 'POST':
form = {
'username' : request.form['username'],
'password' : request.form['password']
}
user = User.validate(**form)
if user[0]:
if request.form['next'] != '':
session['user'] = {
'username': user[1].username,
'id': user[1].id
}
return redirect(request.form['next'])
return redirect(url_for('home'))
return render_template('login.html', next=next_)
def test_move(self):
# add game and check it
game = Game.create(white='123', black='456', state='Ke1,ke8')
self.assertTrue(abs((game.date_created - game.date_state).total_seconds()) < 1)
self.assertEqual(game.next_color, WHITE)
self.assertEqual(Game.select().count(), 1)
self.assertEqual(Move.select().count(), 0)
self.assertFalse(game.ended)
# wait a second
time.sleep(1)
# add move and check
game.add_move('K', 'e1-e2', 'Ke2,ke8')
self.assertEqual(Move.select().count(), 1)
# reload game
game = Game.get(pk=game.pk)
self.assertEqual(game.next_color, BLACK)
self.assertEqual(game.state, 'Ke2,ke8')
self.assertTrue((game.date_state - game.date_created).total_seconds() > 1)
self.assertAlmostEqual(game.moves.get().time_move, 1, places=1)
self.assertFalse(game.ended)
# add move with ending game
game.add_move('k', 'e8-e7', 'Ke2,ke7', True)
self.assertTrue(game.ended)
self.assertEqual(game.winner, BLACK)
def test_game_over_1(self):
# add game and check it
game = Game.create(white='123', black='456', state='Ke1,ke8')
self.assertFalse(game.ended)
self.assertIsNone(game.end_reason)
self.assertIsNone(game.date_end)
# game over with saving
game.game_over(END_CHECKMATE)
self.assertTrue(game.ended)
self.assertEqual(game.end_reason, 1)
self.assertIsNotNone(game.date_end)
self.assertIsNone(game.winner)
# reload game
game = Game.get(pk=game.pk)
self.assertTrue(game.ended)
self.assertEqual(game.end_reason, 1)
self.assertIsNotNone(game.date_end)
def test_game_over_2(self):
# add game and check it
game = Game.create(white='123', black='456', state='Ke1,ke8')
self.assertFalse(game.ended)
self.assertIsNone(game.end_reason)
self.assertIsNone(game.date_end)
# game over without saving
game.game_over(END_CHECKMATE, datetime(2015, 1, 27, 12, 0, 0), False, WHITE)
self.assertTrue(game.ended)
self.assertEqual(game.end_reason, 1)
self.assertEqual(game.date_end, datetime(2015, 1, 27, 12, 0, 0))
self.assertEqual(game.winner, WHITE)
# reload game
game = Game.get(pk=game.pk)
self.assertFalse(game.ended)
self.assertIsNone(game.end_reason)
self.assertIsNone(game.date_end)
self.assertIsNone(game.winner)
def accept(game_id):
try:
pool = GamePool.get(GamePool.pk == game_id)
except GamePool.DoesNotExist:
return send_error('Game not found')
except Exception as e:
return send_error('Wrong format')
if pool.user1 and pool.user1 == request.user:
return send_error('You cannot start game with yourself')
with config.DB.atomic():
pool.player2 = generate_token(True)
pool.user2 = request.user
pool.is_started = True
pool.save()
game = Game.new_game(
pool.player1, pool.player2, pool.type_game, pool.time_limit,
white_user=pool.user1, black_user=pool.user2
)
delete_cache('wait_{}'.format(pool.player1))
result = {'game': pool.player2}
result.update(game.get_info(consts.BLACK))
return send_data(result)
def invited(token):
try:
enemy_token, game_type, game_limit = get_cache('invite_{}'.format(token))
except:
return send_error('game not found')
enemy_user = None
user_id = get_cache('user_{}'.format(enemy_token))
if user_id:
try:
enemy_user = User.get(pk=user_id)
except User.DoesNotExist:
# TODO: if user not found game will be created with None as white player
pass
user_token = generate_token(True)
game = Game.new_game(
enemy_token, user_token, game_type, game_limit,
white_user=enemy_user, black_user=request.user
)
delete_cache('wait_{}'.format(enemy_token))
result = {'game': user_token}
result.update(game.get_info(consts.BLACK))
return send_data(result)
def get(self):
result = []
count = 0
for pool in GamePool.select().where(
GamePool.is_started == False,
GamePool.is_lost == False,
GamePool.player1 is not None,
).order_by(GamePool.date_created.desc()):
if pool.user1 and pool.user1 == request.user:
continue
result.append({
'id': pool.pk,
'date_created': pool.date_created.isoformat(),
'user': pool.user1.username if pool.user1 else None,
'type': consts.TYPES[pool.type_game]['name'],
'limit': pool.time_limit,
})
count += 1
if count > 9:
break
return {'games': result}
def post(self, game_id):
try:
pool = GamePool.get(GamePool.pk == game_id)
except GamePool.DoesNotExist:
raise errors.APINotFound('game')
except Exception as e:
raise errors.APIException('wrong format')
if pool.user1 and pool.user1 == request.user:
raise errors.APIException('you cannot start game with yourself')
pool.player2 = generate_token(True)
pool.user2 = request.user
pool.is_started = True
pool.save()
game = Game.new_game(
pool.player1, pool.player2, pool.type_game, pool.time_limit,
white_user=pool.user1, black_user=pool.user2
)
delete_cache('wait_{}'.format(pool.player1))
result = {'game': pool.player2}
result.update(game.get_info(consts.BLACK))
return result
def test_updating_account_profile_settings(self):
"""
Account profile settings can update full name, about, and website.
"""
self.assertEqual('', self.user.full_name)
self.assertEqual('', self.user.about)
self.assertEqual('', self.user.website)
arguments = {
'full_name' : 'Some User',
'about' : 'About text.',
'website' : 'https://mltshp.com/'
}
self.post_url('/account/settings/profile', arguments=arguments)
user_reloaded = User.get('id = %s', self.user.id)
self.assertEqual('Some User', user_reloaded.full_name)
self.assertEqual('About text.', user_reloaded.about)
self.assertEqual('https://mltshp.com/', user_reloaded.website)
def test_updating_user_profile_photo(self):
"""
Emulate the variables that nginx passes in via the upload module
and see if the file gets uploaded.
"""
photo_path = os.path.abspath("static/images/test-avatar.png")
arguments = {
'photo_path': photo_path,
'photo_content_type': "image/png",
'photo_name': os.path.basename(photo_path),
'photo_size': os.path.getsize(photo_path)
}
response = self.post_url('/account/settings/profile', arguments=arguments)
user_reloaded = User.get('id = %s', self.user.id)
self.assertTrue(user_reloaded.profile_image)
self.assertTrue(user_reloaded.profile_image_url().find('/account') > -1)
def test_pagination_returns_correct_counts(self):
"""
This tests creating 111 shared files for a user and then tests that pagination
on their user page returns the correct pages
"""
user = User.get('name="admin"')
user_shake = user.shake()
source_file = Sourcefile(width=10, height=10, file_key='mumbles', thumb_key='bumbles')
source_file.save()
missing_ids = []
for i in range(111):
sf = Sharedfile(source_id = source_file.id, user_id = user.id, name="shgaredfile.png", title='shared file', share_key='asdf', content_type='image/png', deleted=0)
sf.save()
sf.add_to_shake(user_shake)
for i in range(12):
response = self.fetch('/user/admin/%s' % (i + 1))
self.assertEqual(response.code, 200)
def test_reset_password_finish(self):
"""
Posting the actual reset-password will reset it
"""
self.user.create_reset_password_token()
self.http_client.fetch(self.get_url("/account/reset-password/%s" % (self.user.reset_password_token)), self.stop)
response = self.wait()
request = HTTPRequest(
url = self.get_url("/account/reset-password/%s" % (self.user.reset_password_token)),
method='POST',
headers={"Cookie":"_xsrf=%s" % (self.xsrf)},
body="password=%s&password_again=%s&_xsrf=%s" % ("qwertyqwerty", "qwertyqwerty", self.xsrf)
)
self.http_client.fetch(request, self.stop)
response = self.wait()
self.user = User.get("id=%s", 1)
self.assertEqual(self.user.reset_password_token, "")
self.assertEqual(self.user.hashed_password, User.generate_password_digest('qwertyqwerty'))
def test_reset_password_throws_error_if_passwords_dont_match(self):
"""
Send two different passwords.
"""
self.user.create_reset_password_token()
reset_token = self.user.reset_password_token
request = HTTPRequest(
url = self.get_url("/account/reset-password/%s" % (reset_token)),
method='POST',
headers={"Cookie":"_xsrf=%s" % (self.xsrf)},
body="password=%s&password_again=%s&_xsrf=%s" % ("qwertyqwerty", "poiupoiu", self.xsrf)
)
self.http_client.fetch(request, self.stop)
response = self.wait()
self.assertTrue(response.body.find("Those passwords didn't match, or are invalid. Please try again.") > -1)
self.user = User.get("id = %s", 1)
self.assertEqual(self.user.hashed_password, User.generate_password_digest('asdfasdf'))
self.assertEqual(reset_token, self.user.reset_password_token)
def test_clears_all_saves(self):
self.upload_file(self.test_file1_path, self.test_file1_sha1, self.test_file1_content_type, 1, self.receiver_sid, self.xsrf)
sharedfile = Sharedfile.get('id=1')
n = Notification.new_save(self.sender, sharedfile)
n = Notification.new_save(self.sender, sharedfile)
request = HTTPRequest(self.get_url('/account/clear-notification'), 'POST', {'Cookie':'_xsrf=%s;sid=%s' % (self.xsrf, self.receiver_sid)}, '_xsrf=%s&type=save' % (self.xsrf))
self.http_client.fetch(request, self.stop)
response = self.wait()
j = json_decode(response.body)
self.assertEqual(j['response'], "0 new saves")
ns = Notification.where('receiver_id=%s' % (self.receiver.id))
for n in ns:
self.assertTrue(n.deleted)
def test_clears_all_favorites(self):
self.upload_file(self.test_file1_path, self.test_file1_sha1, self.test_file1_content_type, 1, self.receiver_sid, self.xsrf)
sharedfile = Sharedfile.get('id=1')
n = Notification.new_favorite(self.sender, sharedfile)
n = Notification.new_favorite(self.sender, sharedfile)
request = HTTPRequest(self.get_url('/account/clear-notification'), 'POST', {'Cookie':'_xsrf=%s;sid=%s' % (self.xsrf, self.receiver_sid)}, '_xsrf=%s&type=favorite' % (self.xsrf))
self.http_client.fetch(request, self.stop)
response = self.wait()
j = json_decode(response.body)
self.assertEqual(j['response'], "0 new likes")
ns = Notification.where('receiver_id=%s' % (self.receiver.id))
for n in ns:
self.assertTrue(n.deleted)
def post(self):
url = self.get_argument('url', None)
if not url:
self.render("tools/save-video.html", url = url, title = None, description=None)
url = Sourcefile.make_oembed_url(url.strip())
if url:
current_user = self.get_current_user_object();
shake_id = self.get_argument('shake_id', None)
if not shake_id:
self.destination_shake = Shake.get('user_id=%s and type=%s', current_user.id, 'user')
else:
self.destination_shake = Shake.get('id=%s', shake_id)
if not self.destination_shake:
return self.render("tools/save-video-error.html", message="We couldn't save the video to specified shake. Please contact support.")
if not self.destination_shake.can_update(current_user.id):
return self.render("tools/save-video-error.html", message="We couldn't save the video to specified shake. Please contact support.")
if current_user.email_confirmed != 1:
return self.render("tools/save-video-error.html", message="You must confirm your email address before you can post.")
self.handle_oembed_url(url)
else:
self.render("tools/save-video-error.html", message="We could not load the embed code. The video server may be down. Please contact support.")
def get(self, share_key):
sharedfile = Sharedfile.get_by_share_key(share_key)
if not sharedfile:
raise tornado.web.HTTPError(404)
expanded = self.get_argument("expanded", False)
if expanded:
expanded = True
# Prevent IE from caching AJAX requests
self.set_header("Cache-Control","no-store, no-cache, must-revalidate");
self.set_header("Pragma","no-cache");
self.set_header("Expires", 0);
user = self.get_current_user_object()
can_comment = user and user.email_confirmed == 1 and not options.readonly
comments = sharedfile.comments()
html_response = self.render_string("image/quick-comments.html", sharedfile=sharedfile,
comments=comments, current_user=user,
can_comment=can_comment,
expanded=expanded)
return self.write({'result' : 'ok', 'count' : len(comments), 'html' : html_response })
def post(self, share_key, comment_id):
shared_file = models.Sharedfile.get_by_share_key(share_key)
user = self.get_current_user_object()
comment = Comment.get("id=%s", comment_id)
if not shared_file or not comment:
raise tornado.web.HTTPError(404)
existing_comment_like = models.CommentLike.get("comment_id = %s and user_id = %s",
comment.id, user.id)
if existing_comment_like:
existing_comment_like.deleted = 1
existing_comment_like.save()
json = self.get_argument("json", False)
if json:
self.set_header("Cache-Control","no-store, no-cache, must-revalidate");
self.set_header("Pragma","no-cache");
self.set_header("Expires", 0);
count = models.CommentLike.where_count("comment_id = %s", comment.id)
return self.write(json_encode({'response':'ok', 'count': count, 'like' : True }))
else:
return self.redirect("/p/%s?salty" % (share_key,))
def get(self, shake_name):
shake = Shake.get("name=%s and deleted=0", shake_name)
if not shake:
raise tornado.web.HTTPError(404)
value = {
'title' : escape.xhtml_escape(shake.title) if shake.title else '',
'title_raw' : shake.title if shake.title else '',
'description' : escape.xhtml_escape(shake.description) if shake.description else '',
'description_raw' : shake.description if shake.description else ''
}
# prevents IE from caching ajax requests.
self.set_header("Cache-Control","no-store, no-cache, must-revalidate");
self.set_header("Pragma","no-cache");
self.set_header("Expires", 0);
return self.write(escape.json_encode(value))
def post(self, shake_name):
current_user = self.get_current_user_object()
shake_to_update = Shake.get('name=%s and user_id=%s and type=%s and deleted=0', shake_name, current_user.id, 'group')
new_title = self.get_argument('title', None)
new_description = self.get_argument('description', None)
if not shake_to_update:
return self.write({'error':'No permission to update shake.'})
if new_title:
shake_to_update.title = new_title
if new_description:
shake_to_update.description = new_description
shake_to_update.save()
return self.redirect('/shake/' + shake_to_update.name + '/quick-details')
def post(self, shake_id):
is_json = self.get_argument('json', None)
user = self.get_current_user_object()
shake = Shake.get('id=%s and deleted=0', shake_id)
if not shake:
if is_json:
return self.write(json_encode({'error':'Shake not found.'}))
else:
return self.redirect(shake.path())
if not user.subscribe(shake):
if is_json:
return self.write(json_encode({'error':'error'}))
else:
return self.redirect(shake.path())
else:
if is_json:
return self.write(json_encode({'subscription_status': True}))
else:
return self.redirect(shake.path())
def post(self, shake_id):
is_json = self.get_argument('json', None)
user = self.get_current_user_object()
shake = Shake.get('id=%s and deleted=0', shake_id)
if not shake:
if is_json:
return self.write({'error':'Shake not found.'})
else:
return self.redirect(shake.path())
if not user.unsubscribe(shake):
if is_json:
return self.write({'error':'error'})
else:
return self.redirect(shake.path())
else:
if is_json:
return self.write(json_encode({'subscription_status': False}))
else:
return self.redirect(shake.path())
def post(self, shake_name):
shake = Shake.get('name=%s and type=%s', shake_name, "group")
if not shake:
raise tornado.web.HTTPError(404)
sender = self.get_current_user_object()
receiver = User.get('name=%s and deleted=0', self.get_argument('name', None))
is_json = self.get_argument('json', None)
if not receiver:
if is_json:
return self.write({'error':'error'})
else:
return self.redirect('/%s' % (shake_name))
Notification.new_invitation(sender, receiver, shake.id)
if is_json:
return self.write({'invitation_status':True})
else:
return self.redirect('/%s' % (shake_name))
def post(self, shake_name=None):
shake = Shake.get('name=%s and deleted=0', shake_name)
current_user_object = self.get_current_user_object()
if not shake:
raise tornado.web.HTTPError(404)
if current_user_object.can_request_invitation_to_shake(shake.id):
current_user_object.request_invitation_to_shake(shake.id)
if self.get_argument('json', None):
return self.write({'status':'ok'})
else:
return self.redirect('/%s' % (shake.name))
else:
if self.get_argument('json', None):
return self.write({'status':'error', 'message':'not allowed'})
else:
return self.redirect('/')
def get(self, shake_name=None, page=None):
shake_object = Shake.get("name=%s", shake_name)
if not page:
page = 1
page = int(page)
if not shake_object:
raise tornado.web.HTTPError(404)
followers = shake_object.subscribers(page=page)
follower_count = shake_object.subscriber_count()
url_format = '/shake/%s/followers/' % shake_object.name
url_format = url_format + '%d'
return self.render("shakes/followers.html", followers=followers,
shake_info=shake_object,
url_format=url_format, follower_count=follower_count,
page=page)