def authorized():
if request.user:
return send_data({'username': request.user.username})
return send_error('not authorized')
python类user()的实例源码
def new():
@validated(GameNewValidator)
def _post(data):
game_type = data['type']
game_limit = data['limit']
token = generate_token(True)
pool = GamePool.create(
player1 = token,
user1 = request.user,
type_game = game_type,
time_limit = game_limit,
)
set_cache('wait_{}'.format(token), (game_type, game_limit))
return send_data({'game': token})
if request.method == 'GET':
result = []
count = 0
for pool in GamePool.select().where(
GamePool.is_started == False,
GamePool.is_lost == False,
GamePool.player1 is not None,
).order_by(GamePool.date_created.desc()):
if pool.user1 and pool.user1 == request.user:
continue
result.append({
'id': pool.pk,
'date_created': pool.date_created.isoformat(),
'user': pool.user1.username if pool.user1 else None,
'type': consts.TYPES[pool.type_game]['name'],
'limit': pool.time_limit,
})
count += 1
if count > 9:
break
return send_data({'games': result})
elif request.method == 'POST':
return _post()
def invite(data):
game_type = data['type']
game_limit = data['limit']
if game_type != consts.TYPE_NOLIMIT and not game_limit:
return send_error('game limit must be set for no limit game')
token_game = generate_token(True)
token_invite = generate_token(True)
set_cache('invite_{}'.format(token_invite), (token_game, game_type, game_limit))
if request.user:
set_cache('user_{}'.format(token_game), request.user.pk, 3600)
set_cache('wait_{}'.format(token_game), (game_type, game_limit, token_invite))
return send_data({
'game': token_game,
'invite': token_invite,
})
def games():
from models import Game
result = {
'games': {
'actives': [],
'ended': [],
}
}
if request.user:
games = Game.select().where(
Game.date_end == None,
(Game.player_white == request.user) | (Game.player_black == request.user),
)
for game in games:
if game.player_white == request.user:
result['games']['actives'].append(game.white)
else:
result['games']['actives'].append(game.black)
games = Game.select().where(
Game.date_end != None,
(Game.player_white == request.user) | (Game.player_black == request.user),
).limit(10)
for game in games:
if game.player_white == request.user:
result['games']['ended'].append(game.white)
else:
result['games']['ended'].append(game.black)
return send_data(result)
def post(self):
message = ChatMessage.create(user=request.user, text=self.data['text'])
result = {'message': MessageSerializer(message).calc()}
send_ws(result, WS_CHAT_MESSAGE)
return result
def post(self):
username = self.data['username']
password = self.data['password']
email = self.data['email']
user = User.add(username, password, email)
if email:
token = user.get_verification()
data = {
'username': username,
'url': urljoin(config.SITE_URL, config.VERIFY_URL),
'token': token,
}
send_mail_template('registration', [email], data=data)
return 'registration successful'
def get(self, token):
user = User.get_by_token(token)
if user:
user.verify()
delete_cache(token)
return 'verification completed'
raise APINotFound('token')
def get(self):
if request.user:
return {'username': request.user.username}
raise APIUnauthorized
def post(self):
try:
user = User.get(User.email == self.data['email'])
except User.DoesNotExist:
raise APINotFound('hey email')
token = user.get_reset()
data = {
'username': user.username,
'url': urljoin(config.SITE_URL, config.RECOVER_URL),
'token': token,
}
send_mail_template('reset', [user.email], data=data)
return 'send recover email'
def get(self, token):
user = User.get_by_token(token)
if not user:
raise APINotFound('token')
return 'token is found'
def post(self, token):
user = User.get_by_token(token)
if not user:
raise APINotFound('token')
user.set_password(self.data['password'])
user.save()
delete_cache(token)
return 'password changed'
def post(self):
game_type = self.data['type']
game_limit = self.data['limit']
token = generate_token(True)
pool = GamePool.create(
player1 = token,
user1 = request.user,
type_game = game_type,
time_limit = game_limit,
)
set_cache('wait_{}'.format(token), (game_type, game_limit))
return {'game': token}
def get(self):
from models import Game
result = {
'games': {
'actives': [],
'ended': [],
}
}
if request.user:
games = Game.select().where(
Game.date_end == None,
(Game.player_white == request.user) | (Game.player_black == request.user),
)
for game in games:
if game.player_white == request.user:
result['games']['actives'].append(game.white)
else:
result['games']['actives'].append(game.black)
games = Game.select().where(
Game.date_end != None,
(Game.player_white == request.user) | (Game.player_black == request.user),
).limit(10)
for game in games:
if game.player_white == request.user:
result['games']['ended'].append(game.white)
else:
result['games']['ended'].append(game.black)
return result
def get(self, pk):
'''
??????
'''
user = request.user
album = Album.query.filter_by(id=pk, user=user).first()
if not album:
msg = '?????'
return HTTPResponse(
HTTPResponse.HTTP_CODE_NOT_EXIST, message=msg).to_response()
serializer = AlbumSerializer(album)
return HTTPResponse(
HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
def delete(self, pk):
'''
??????
'''
user = request.user
album = Album.query.filter_by(id=pk, user=user).first()
if not album:
msg = '?????'
return HTTPResponse(
HTTPResponse.HTTP_CODE_NOT_EXIST, message=msg).to_response()
serializer = AlbumSerializer(album)
album.delete()
return HTTPResponse(
HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
def get(self, pk):
'''
????
'''
user = request.user
image = Image.query.filter_by(id=pk, user=user).first()
if not image:
msg = '?????'
return HTTPResponse(
HTTPResponse.HTTP_CODE_NOT_EXIST, message=msg).to_response()
serializer = ImageSerializer(image)
return HTTPResponse(
HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
def preprocess_request(self):
g.user = current_user
request.user = current_user._get_current_object()
if request.method == 'GET':
request.data = request.args.to_dict()
else:
request.data = request.json
if request.data is None:
request.data = request.form.to_dict()
def session(cls, **kwargs) -> dict:
return {
'username': request.user,
'last_login': flask_session.get('last_login', datetime.now().isoformat())
}
def get_allowed_actions(user, actions):
# determine what actions are allowed here
logging.debug('Requested actions: {}'.format(actions))
# The three actions used by the registry are 'push', 'pull', and '*':
# https://github.com/docker/distribution/blob/master/registry/handlers/app.go#L875
allowed_actions = actions
# allowed_actions = []
# if 'pull' in actions:
# actions.remove('pull')
logging.debug('Allowed actions: {}'.format(allowed_actions))
return allowed_actions
def getpassword():
user = request.user
token = Token('password', subject=user)
encoded_token = token.encode_token()
logging.info('Issued registry password to {}'.format(user))
return jsonify(username='PASSTOKEN', password=encoded_token)