def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
logger.info("{0} {1} {2} {3}".format(request.remote_addr, request.method, request.url, str(request.args)))
logger.debug("data: {0}".format(str(request.form)))
auth = request.authorization
logger.debug('Check_auth: ' + str(auth))
if not auth or not check_auth(auth.username, auth.password):
logger.warning("Unauthorized.")
return {'error' : 'Unauthorized.'}, 401
return f(*args, **kwargs)
return decorated
python类authorization()的实例源码
def requires_authentication(function):
"""Creates a decorator that can be applied with @requires_authentication
to protect an API endpoint."""
@wraps(function)
def decorated(*args, **kwargs):
"""Checks for authorization headers. Tells the user to authenticate
if none are found."""
auth = request.authorization
if not auth or not check_authentication(auth.username, auth.password):
return authenticate()
return function(*args, **kwargs)
return decorated
def require_auth_header(func):
""" no auth check is performed but the auth headers are still required. auth check
is performed by a service other than ourselves """
@wraps(func)
def decorator(*args, **kwargs):
if not request.authorization:
return ErrorResponseJson("auth header required").make_response()
return func(*args, **kwargs)
return decorator
def after_request(response):
""" called after every request """
# log the endpoint hit and any errors
delta = int((time.time() - g.start_time) * 1000)
start_utc = datetime.datetime.utcfromtimestamp(g.start_time)
username = request.authorization.username if request.authorization else None
err_msg = response.get_data(as_text=True) if response.status_code // 100 >= 4 else None
Logger.endpoint_hit(start_utc, delta, request.base_url, username, request.method,
response.status_code, err_msg)
return response
def verify_user(self, require_admin=True, require_credentials=True):
auth = request.authorization
if not auth:
return False
username = auth.username
password = auth.password
user = self._mongo.db['users'].find_one({'username': username})
if not user:
return False
result = False
ip = get_ip()
if not require_credentials:
result = self._verify_user_by_token(user, password, ip)
if not result and self._is_blocked_temporarily(username):
return False
if not result:
result = _verify_user_by_credentials(user, password)
if not result:
self._add_block_entry(username)
return False
if not require_admin or user['is_admin']:
return True
return False
def _is_blocked_temporarily(self, username):
num_login_attempts = self._config.defaults['authorization']['num_login_attempts']
block_for_seconds = self._config.defaults['authorization']['block_for_seconds']
self._mongo.db['block_entries'].delete_many({'timestamp': {'$lt': time() - block_for_seconds}})
block_entries = list(self._mongo.db['block_entries'].find({'username': username}))
if len(block_entries) > num_login_attempts:
return True
return False
def issue_token(self):
salt = urandom(16)
kdf = _kdf(salt)
token = generate_secret()
username = request.authorization.username
ip = get_ip()
self._mongo.db['tokens'].insert_one({
'username': username,
'ip': ip,
'salt': salt,
'token': kdf.derive(token.encode('utf-8')),
'timestamp': time()
})
return token
def _verify_user_by_token(self, user, token, ip):
tokens_valid_for_seconds = self._config.defaults['authorization']['tokens_valid_for_seconds']
self._mongo.db['tokens'].delete_many({'timestamp': {'$lt': time() - tokens_valid_for_seconds}})
cursor = self._mongo.db['tokens'].find(
{'username': user['username'], 'ip': ip},
{'token': 1, 'salt': 1}
)
for c in cursor:
try:
kdf = _kdf(c['salt'])
kdf.verify(token.encode('utf-8'), c['token'])
return True
except:
pass
return False
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def getCurrentUser(request):
auth = request.authorization
if not auth:
return None
token = auth.username
return User.verify_auth_token(token)
def requires_passcode(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if config.get('web_passcode') and not session.get('allowed') and (
not request.authorization or request.authorization.username != config['web_passcode']):
return redirect(url_for('login'))
return func(*args, **kwargs)
return wrapper
def ensure_admin_authenticated():
auth = request.authorization
if not auth or auth.username != config.admin_username or auth.password != config.admin_password:
return Response('401 Unauthorized', 401, {
'WWW-Authenticate': 'Basic realm="Login Required"'
})
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
sl = SettingLoader()
settings = sl.settings
if settings.rest_api.password_protected:
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
#####################
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
def username(self):
if not request.authorization:
return ""
return request.authorization.username
decorator.py 文件源码
项目:tensorflow-object-detection-example
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated