def usergetter(self, f):
"""Register a function as the user getter.
This decorator is only required for **password credential**
authorization::
@oauth.usergetter
def get_user(username, password, client, request,
*args, **kwargs):
# client: current request client
if not client.has_password_credential_permission:
return None
user = User.get_user_by_username(username)
if not user.validate_password(password):
return None
# parameter `request` is an OAuthlib Request object.
# maybe you will need it somewhere
return user
"""
self._usergetter = f
return f
python类user()的实例源码
def tokengetter(self, f):
"""Register a function as the token getter.
The function accepts an `access_token` or `refresh_token` parameters,
and it returns a token object with at least these information:
- access_token: A string token
- refresh_token: A string token
- client_id: ID of the client
- scopes: A list of scopes
- expires: A `datetime.datetime` object
- user: The user object
The implementation of tokengetter should accepts two parameters,
one is access_token the other is refresh_token::
@oauth.tokengetter
def bearer_token(access_token=None, refresh_token=None):
if access_token:
return get_token(access_token=access_token)
if refresh_token:
return get_token(refresh_token=refresh_token)
return None
"""
self._tokengetter = f
return f
def tokensetter(self, f):
"""Register a function to save the bearer token.
The setter accepts two parameters at least, one is token,
the other is request::
@oauth.tokensetter
def set_token(token, request, *args, **kwargs):
save_token(token, request.client, request.user)
The parameter token is a dict, that looks like::
{
u'access_token': u'6JwgO77PApxsFCU8Quz0pnL9s23016',
u'token_type': u'Bearer',
u'expires_in': 3600,
u'scope': u'email address'
}
The request is an object, that contains an user object and a
client object.
"""
self._tokensetter = f
return f
def validate_code(self, client_id, code, client, request, *args, **kwargs):
"""Ensure the grant code is valid."""
client = client or self._clientgetter(client_id)
log.debug(
'Validate code for client %r and code %r', client.client_id, code
)
grant = self._grantgetter(client_id=client.client_id, code=code)
if not grant:
log.debug('Grant not found.')
return False
if hasattr(grant, 'expires') and \
datetime.datetime.utcnow() > grant.expires:
log.debug('Grant is expired.')
return False
request.state = kwargs.get('state')
request.user = grant.user
request.scopes = grant.scopes
return True
def validate_refresh_token(self, refresh_token, client, request,
*args, **kwargs):
"""Ensure the token is valid and belongs to the client
This method is used by the authorization code grant indirectly by
issuing refresh tokens, resource owner password credentials grant
(also indirectly) and the refresh token grant.
"""
token = self._tokengetter(refresh_token=refresh_token)
if token and token.client_id == client.client_id:
# Make sure the request object contains user and client_id
request.client_id = token.client_id
request.user = token.user
return True
return False
def validate_user(self, username, password, client, request,
*args, **kwargs):
"""Ensure the username and password is valid.
Attach user object on request for later using.
"""
log.debug('Validating username %r and its password', username)
if self._usergetter is not None:
user = self._usergetter(
username, password, client, request, *args, **kwargs
)
if user:
request.user = user
return True
return False
log.debug('Password credential authorization is disabled.')
return False
def authenticated(f):
@wraps(f)
def decorator(*args, **kwargs):
token = (request.json or {}).get('auth') or \
request.values.get('auth') or \
request.cookies.get('auth')
request.user = None
request.auth = None
if token is not None:
user_id = get_cache(token)
if user_id:
try:
user = User.get(pk=user_id)
except User.DoesNotExist:
pass
else:
request.user = user
request.auth = token
return f(*args, **kwargs)
return decorator
def with_game(f):
@wraps(f)
def decorator(token, *args, **kwargs):
from game import Game
try:
game = Game.load_game(token)
except errors.GameNotStartedError as exc:
data = {
'type': consts.TYPES[exc.type]['name'],
'limit': exc.limit,
}
if (exc.token):
data['invite'] = exc.token
return send_data(data)
except errors.GameNotFoundError as exc:
return send_error(exc.message)
if game._loaded_by == consts.WHITE:
if game.model.player_white is not None and game.model.player_white != request.user:
return send_error('wrong user')
else:
if game.model.player_black is not None and game.model.player_black != request.user:
return send_error('wrong user')
return f(game, *args, **kwargs)
return decorator
def recover(token):
@validated(RecoverValidator)
def _post(user, data):
user.set_password(data['password'])
user.save()
delete_cache(token)
return send_message('password changed')
user = User.get_by_token(token)
if user:
if request.method == 'GET':
return send_success()
elif request.method == 'POST':
return _post(user)
return send_error('token not found')
def invited(token):
try:
enemy_token, game_type, game_limit = get_cache('invite_{}'.format(token))
except:
return send_error('game not found')
enemy_user = None
user_id = get_cache('user_{}'.format(enemy_token))
if user_id:
try:
enemy_user = User.get(pk=user_id)
except User.DoesNotExist:
# TODO: if user not found game will be created with None as white player
pass
user_token = generate_token(True)
game = Game.new_game(
enemy_token, user_token, game_type, game_limit,
white_user=enemy_user, black_user=request.user
)
delete_cache('wait_{}'.format(enemy_token))
result = {'game': user_token}
result.update(game.get_info(consts.BLACK))
return send_data(result)
def load_game(self, token):
try:
game = Game.load_game(token)
except errors.GameNotStartedError as e:
data = {
'type': consts.TYPES[e.type]['name'],
'limit': e.limit,
}
if (e.token):
data['invite'] = e.token
return data
except errors.GameNotFoundError as e:
raise errors.APIException(e.message)
if game._loaded_by == consts.WHITE:
if game.model.player_white is not None and game.model.player_white != request.user:
raise errors.APIException('wrong user')
else:
if game.model.player_black is not None and game.model.player_black != request.user:
raise errors.APIException('wrong user')
self.game = game
def get(self):
result = []
count = 0
for pool in GamePool.select().where(
GamePool.is_started == False,
GamePool.is_lost == False,
GamePool.player1 is not None,
).order_by(GamePool.date_created.desc()):
if pool.user1 and pool.user1 == request.user:
continue
result.append({
'id': pool.pk,
'date_created': pool.date_created.isoformat(),
'user': pool.user1.username if pool.user1 else None,
'type': consts.TYPES[pool.type_game]['name'],
'limit': pool.time_limit,
})
count += 1
if count > 9:
break
return {'games': result}
def post(self, game_id):
try:
pool = GamePool.get(GamePool.pk == game_id)
except GamePool.DoesNotExist:
raise errors.APINotFound('game')
except Exception as e:
raise errors.APIException('wrong format')
if pool.user1 and pool.user1 == request.user:
raise errors.APIException('you cannot start game with yourself')
pool.player2 = generate_token(True)
pool.user2 = request.user
pool.is_started = True
pool.save()
game = Game.new_game(
pool.player1, pool.player2, pool.type_game, pool.time_limit,
white_user=pool.user1, black_user=pool.user2
)
delete_cache('wait_{}'.format(pool.player1))
result = {'game': pool.player2}
result.update(game.get_info(consts.BLACK))
return result
def get(self, token):
try:
enemy_token, game_type, game_limit = get_cache('invite_{}'.format(token))
except:
raise errors.APINotFound('game')
enemy_user = None
user_id = get_cache('user_{}'.format(enemy_token))
if user_id:
try:
enemy_user = User.get(pk=user_id)
except User.DoesNotExist:
# TODO: if user not found game will be created with None as white player
pass
user_token = generate_token(True)
game = Game.new_game(
enemy_token, user_token, game_type, game_limit,
white_user=enemy_user, black_user=request.user
)
delete_cache('wait_{}'.format(enemy_token))
result = {'game': user_token}
result.update(game.get_info(consts.BLACK))
return result
def get(self):
'''
??????
'''
query_dict = request.data
user = request.user
page, number = self.page_info
keys = ['name', 'description']
order_by = gen_order_by(query_dict, keys)
filter_dict = gen_filter_dict(query_dict, keys, user=user)
albums = Album.query.filter_by(
**filter_dict).order_by(*order_by).paginate(page, number)
serializer = AlbumSerializer(albums.items, True)
pageinfo = PageInfo(albums)
return HTTPResponse(
HTTPResponse.NORMAL_STATUS,
data=serializer.data,
pageinfo=pageinfo).to_response()
def post(self):
'''
????
'''
post_data = request.data
user = request.user
name = post_data.pop('name', None)
description = post_data.pop('description', None)
if name is None:
msg = '????????'
return HTTPResponse(
HTTPResponse.HTTP_CODE_PARA_ERROR, message=msg).to_response()
album = Album(name=name, user=user)
if description is not None:
album.description = description
album.save()
serializer = AlbumSerializer(album)
return HTTPResponse(
HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
def put(self, pk):
'''
????
'''
post_data = request.data
user = request.user
name = post_data.pop('name', None)
description = post_data.pop('description', None)
album = Album.query.filter_by(id=pk, user=user).first()
if not album:
msg = '?????'
return HTTPResponse(
HTTPResponse.HTTP_CODE_NOT_EXIST, message=msg).to_response()
if name is not None:
album.name = name
if description is not None:
album.description = description
album.save()
serializer = AlbumSerializer(album)
album.delete()
return HTTPResponse(
HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
def put(self, pk):
'''
??????
'''
post_data = request.data
user = request.user
name = post_data.pop('name', None)
description = post_data.pop('description', None)
image = Image.query.filter_by(id=pk, user=user).first()
if not image:
msg = '?????'
return HTTPResponse(
HTTPResponse.HTTP_CODE_NOT_EXIST, message=msg).to_response()
if name is not None:
image.name = name
image.url = os.path.join(image.path, name)
if description is not None:
image.description = description
image.save()
serializer = ImageSerializer(image)
return HTTPResponse(
HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
def delete(self, pk):
'''
????
'''
user = request.user
image = Image.query.filter_by(id=pk, user=user).first()
if not image:
msg = '?????'
return HTTPResponse(
HTTPResponse.HTTP_CODE_NOT_EXIST, message=msg).to_response()
serializer = ImageSerializer(image)
img_path = os.path.join(current_app.config['UPLOAD_FOLDER_ROOT'],
image.url)
# ????
if os.path.exists(img_path):
os.remove(img_path)
# ?????
thumb_path = os.path.join(current_app.config['UPLOAD_FOLDER_ROOT'],
image.url.replace('photo', 'thumb'))
if os.path.exists(thumb_path):
os.remove(thumb_path)
image.delete()
return HTTPResponse(
HTTPResponse.NORMAL_STATUS, data=serializer.data).to_response()
def process_request():
"""
Process request.
- Set api_url
"""
base_url = request.base_url
referrer = request.headers.get('referer')
if referrer:
# we use referrer as base url
parts = urlparse(referrer)
base_url = urlunparse((parts.scheme, parts.netloc, '', '', '', ''))
elif APP_URL:
base_url = APP_URL
# Used in building full URIs
request.api_url = urljoin(base_url, API_PREFIX + '/')
request.user = flask_session.get('user')
request.realm = flask_session.get('realm', 'employees')
def notifications():
if request.user != 'NOTIFICATION':
return ('', 403)
data = json.loads(request.data)
if not 'events' in data:
return ('', 204)
for event in data['events']:
logging.debug(event)
# At this time the possible values for action seem to be
# push, pull, delete, and mount:
# https://github.com/docker/distribution/blob/master/notifications/event.go#L10
action = event['action']
repo = event['target']['repository']
digest = event['target']['digest']
# tag may not be present
tag = event['target'].get('tag')
timestamp = dateutil.parser.parse(event['timestamp'])
user = event['actor']['name']
logging.info("action '{}' repo '{}' digest '{}' tag '{}' timestamp '{}' user '{}'".format(action, repo, digest, tag, timestamp, user))
# Save to database, etc. here
return ('', 204)
def requires_auth(func):
''' Decorator for view functions that require basic authentication. '''
from .models import Session, User
@functools.wraps(func)
def wrapper(*args, **kwargs):
user_name = session.get('user_name')
user_passhash = session.get('user_passhash')
with Session() as db_session:
user = User.get_by(db_session, user_name, user_passhash)
if not user:
return redirect(url_for('login'))
request.user = user
return func(*args, **kwargs)
return wrapper
def after_request(self, f):
"""Register functions to be invoked after accessing the resource.
The function accepts ``valid`` and ``request`` as parameters,
and it should return a tuple of them::
@oauth.after_request
def valid_after_request(valid, oauth):
if oauth.user in black_list:
return False, oauth
return valid, oauth
"""
self._after_request_funcs.append(f)
return f
def grantsetter(self, f):
"""Register a function to save the grant code.
The function accepts `client_id`, `code`, `request` and more::
@oauth.grantsetter
def set_grant(client_id, code, request, *args, **kwargs):
save_grant(client_id, code, request.user, request.scopes)
"""
self._grantsetter = f
return f
def validate_grant_type(self, client_id, grant_type, client, request,
*args, **kwargs):
"""Ensure the client is authorized to use the grant type requested.
It will allow any of the four grant types (`authorization_code`,
`password`, `client_credentials`, `refresh_token`) by default.
Implemented `allowed_grant_types` for client object to authorize
the request.
It is suggested that `allowed_grant_types` should contain at least
`authorization_code` and `refresh_token`.
"""
if self._usergetter is None and grant_type == 'password':
log.debug('Password credential authorization is disabled.')
return False
default_grant_types = (
'authorization_code', 'password',
'client_credentials', 'refresh_token',
)
# Grant type is allowed if it is part of the 'allowed_grant_types'
# of the selected client or if it is one of the default grant types
if hasattr(client, 'allowed_grant_types'):
if grant_type not in client.allowed_grant_types:
return False
else:
if grant_type not in default_grant_types:
return False
if grant_type == 'client_credentials':
if not hasattr(client, 'user'):
log.debug('Client should have a user property')
return False
request.user = client.user
return True
def login_required(f):
@wraps(f)
def decorator(*args, **kwargs):
if getattr(request, 'user', None):
return f(*args, **kwargs)
return send_error('not authorized')
return decorator
def messages():
@validated(MessageValidator)
def _post(data):
message = ChatMessage.create(user=request.user, text=data['text'])
result = {'message': MessageSerializer(message).calc()}
send_ws(result, consts.WS_CHAT_MESSAGE)
return send_data(result)
if request.method == 'GET':
try:
limit = int(request.args.get('limit', -1))
offset = int(request.args.get('offset', -1))
if limit < 0:
limit = config.DEFAULT_COUNT_MESSAGES
if offset < 0:
offset = 0
except Exception as e:
log.error(e)
return send_error('wrong arguments')
messages = ChatMessage.select()\
.where(ChatMessage.chat == None)\
.order_by(-ChatMessage.date_created)\
.offset(offset)\
.limit(limit)
return send_data({
'messages': [MessageSerializer(m).calc() for m in messages],
})
elif request.method == 'POST':
return _post()
def register(data):
username = data['username']
password = data['password']
email = data['email']
user = User.add(username, password, email)
if email:
token = user.get_verification()
data = {
'username': username,
'url': urljoin(config.SITE_URL, config.VERIFY_URL),
'token': token,
}
send_mail_template('registration', [email], data=data)
return send_message('registration successful')
def verify(token):
user = User.get_by_token(token)
if user:
user.verify()
delete_cache(token)
return send_success()
return send_error('token not found')
def reset(data):
try:
user = User.get(User.email == data['email'])
except User.DoesNotExist:
return send_error('email not found')
else:
token = user.get_reset()
data = {
'username': user.username,
'url': urljoin(config.SITE_URL, config.RECOVER_URL),
'token': token,
}
send_mail_template('reset', [user.email], data=data)
return send_success()