def token_verify(token):
secret_key = current_app.config.get('SECRET_KEY')
try:
data = jwt.decode(token, secret_key, algorithm='HS256')
return UserService.instance().get_one_or_fail(data['id'])
except jwt.DecodeError:
return False
python类DecodeError()的实例源码
def jwt_verify(view_func):
def _wrapped_view_func(request, *args, **kwargs):
if hasattr(request, 'META') and request.META.get('HTTP_AUTHORIZATION'):
splitted_token = request.META['HTTP_AUTHORIZATION'].split()
if len(splitted_token):
token = splitted_token[1]
try:
payload = jwt.decode(token, settings.JWT_SECRET_KEY, True)
return view_func(request, *args, **kwargs)
except jwt.DecodeError as err:
return HttpResponse(status=401)
return _wrapped_view_func
def __call__(self, environ, start_response):
try:
(event_token_payload, request_body) = TokenVerifierFilter.decode_and_verify_request(environ,
self.app_secret,
self.appId)
if event_token_payload is None or request_body is None:
send_403_response(start_response)
else:
return self.app(environ, start_response)
except jwt.DecodeError:
send_403_response(start_response)
def handle(self, environ, start_response):
if 'event_token_payload' not in environ:
try:
payload, event_json = TokenVerifierFilter.decode_and_verify_request(environ,
self.app_secret,
self.app_id)
event = Event(event_json)
except jwt.DecodeError:
send_403_response(start_response)
return {}
else:
payload, event = environ['event_token_payload'], Event(environ['request_body'])
if event.name == "app.install":
return EventHandlerClient.send_response(self.on_app_install_handler, event, start_response)
elif event.name == "app.uninstall":
return EventHandlerClient.send_response(self.on_app_uninstall_handler, event, start_response)
elif event.name == "chat.generateUrlPreview":
return EventHandlerClient.send_response(self.on_chat_generate_url_preview_handler, event, start_response)
elif event.name == "chat.receiveMessage":
return EventHandlerClient.send_response(self.on_chat_receive_message_handler, event, start_response)
elif event.name == "client.flockmlAction":
return EventHandlerClient.send_response(self.on_client_flockml_action_handler, event, start_response)
elif event.name == "client.messageAction":
return EventHandlerClient.send_response(self.on_client_message_action_handler, event, start_response)
elif event.name == "client.openAttachmentWidget":
return EventHandlerClient.send_response(self.on_client_open_attachment_widget_handler, event,
start_response)
elif event.name == "client.pressButton":
return EventHandlerClient.send_response(self.on_client_press_button_handler, event, start_response)
elif event.name == "client.slashCommand":
return EventHandlerClient.send_response(self.on_client_slash_command_handler, event, start_response)
elif event.name == "client.widgetAction":
return EventHandlerClient.send_response(self.on_client_widget_action_handler, event, start_response)
else:
raise Exception("Unknown event encountered" + event.name)
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not request.headers.get('Authorization'):
response = jsonify(message='Missing authorization header')
response.status_code = 401
return response
try:
payload = parse_token(request)
except DecodeError:
response = jsonify(message='Token is invalid')
response.status_code = 401
return response
except ExpiredSignature:
response = jsonify(message='Token has expired')
response.status_code = 401
return response
g.user_id = payload['sub']
return f(*args, **kwargs)
return decorated_function
# Helper functions, get currently logged in user
def decode(self, token):
try:
return jwt.decode(token,
self.secret,
algorithm=self.algorithm,
issuer=self.issuer)
except jwt.ExpiredSignature:
raise InvalidUsage("Token is expired")
except jwt.DecodeError:
raise InvalidUsage('Token signature is invalid')
except Exception:
raise Exception('Unable to parse authentication token.')
def check_auth(self):
auth = request.headers.get('Authorization', None)
message = ''
if not auth:
abort(401, message = 'Authorization header is expected')
parts = auth.split()
if parts[0].lower() != 'bearer':
message = 'Authorization header must start with Bearer'
elif len(parts) == 1:
message = 'Token not found'
elif len(parts) > 2:
message = 'Authorization header must be Bearer + \s + token'
if message:
abort(401, message = message)
token = parts[1]
try:
payload = jwt.decode(
token,
Security.get_jwt_skey(),
algorithms = ['HS256']
)
except jwt.ExpiredSignature:
message = 'token is expired'
except jwt.InvalidAudienceError:
message = 'incorrect audience'
except jwt.DecodeError:
message = 'token signature is invalid'
if message:
abort(401, message = message)
self.logger.debug('Access granted for %s!' % payload['user']['login'])
return payload
def decode_token(self, token):
try:
decoded = jwt.decode(token, self.config['JWT_SECRET_KEY'],
algorithms=[self.config['JWT_ALGORITHM']], leeway=self.config.get('JWT_LEEWAY', 0))
except jwt.ExpiredSignatureError:
raise werkzeug.exceptions.Unauthorized("JWT Error: Token is expired.")
except jwt.DecodeError:
raise werkzeug.exceptions.Unauthorized("JWT Error: Token could not be decoded.")
except jwt.InvalidTokenError:
raise werkzeug.exceptions.Unauthorized("JWT Error: Token is invalid.")
return decoded
def do_auth(self, token, *args, **kwargs):
dummy, secret = self.get_key_and_secret()
try: # Decode the token, using the Application Signature from settings
decoded = jwt.decode(token, secret, algorithms=['HS256'])
except jwt.DecodeError: # Wrong signature, fail authentication
raise AuthCanceled(self)
kwargs.update({'response': {'token': decoded}, 'backend': self})
return self.strategy.authenticate(*args, **kwargs)
def user_data(self, access_token, *args, **kwargs):
"""Return user data by querying Microsoft service"""
try:
return self.get_json(
'https://graph.microsoft.com/v1.0/me',
headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json',
'Authorization': 'Bearer ' + access_token
},
method='GET'
)
except (DecodeError, ExpiredSignature) as error:
raise AuthTokenError(self, error)
def user_data(self, access_token, *args, **kwargs):
response = kwargs.get('response')
id_token = response.get('id_token')
try:
decoded_id_token = jwt_decode(id_token, verify=False)
except (DecodeError, ExpiredSignature) as de:
raise AuthTokenError(self, de)
return decoded_id_token
def verify_auth_token(token):
"""Get the user from a JWT token."""
try:
decoded = jwt.decode(token, secret, algorithms=['HS256'])
except jwt.DecodeError:
return None
user = User.query.get(decoded['id'])
return user
def check_token_status(self):
"""
Simple request to check if session has expired (TBD)
:return: Token status
"""
if self.access_token is None:
try:
token_file = self.platform['credentials']['token_file']
token_path = os.path.join(
self.workspace.workspace_root,
self.workspace.platforms_dir,
token_file)
# Construct the POST login request
with open(token_path, "r") as _file:
self.access_token = _file.read
except:
return True
print('Public_key=', self.platform_public_key)
if self.platform_public_key is None:
return True
# Some old PyJWT versions crash with public key binary string, instead add
# self.platform_public_key.decode('utf-8')
try:
print('access_token=', self.access_token)
print('platform_public_key=', self.platform_public_key.decode('utf-8'))
decoded = jwt.decode(self.access_token, self.platform_public_key.decode('utf-8'),
True, algorithms='RS256', audience='adapter')
# options={'verify_aud': False})
print('contents', decoded)
try:
self.username = decoded['preferred_username']
return True
except:
return True
except jwt.DecodeError:
print('Token cannot be decoded because it failed validation')
return False
except jwt.ExpiredSignatureError:
print('Signature has expired')
return False
except jwt.InvalidIssuerError:
return False
except jwt.InvalidIssuedAtError:
return False
def check_token_status():
"""
Simple request to check if session has expired (TBD)
:return: Token status
"""
import jwt
access_token = ''
platform_public_key = b''
print('Public_key=', platform_public_key)
if platform_public_key is None:
return True
# Some old PyJWT versions crash with public key binary string, instead add
# self.platform_public_key.decode('utf-8')
try:
print('access_token=', access_token)
print('platform_public_key=', platform_public_key.decode('utf-8'))
decoded = jwt.decode(access_token, platform_public_key.decode('utf-8'),
True, algorithms='RS256', audience='adapter')
# options={'verify_aud': False})
print('contents', decoded)
try:
username = decoded['preferred_username']
return True
except:
return True
except jwt.DecodeError:
print('Token cannot be decoded because it failed validation')
return False
except jwt.ExpiredSignatureError:
print('Signature has expired')
return False
except jwt.InvalidIssuerError:
return False
except jwt.InvalidIssuedAtError:
return False
# generate_keypair()
# token = user_login('user04', '1234')
# print(token)
# key = get_platform_public_key()
# check_token(key, token)
# sign()
# result = check_token_status()
# print("RESULT=", result)