python类DecodeError()的实例源码

auth.py 文件源码 项目:hug-n-rest 作者: khanhicetea 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def token_verify(token):
    secret_key = current_app.config.get('SECRET_KEY')
    try:
        data = jwt.decode(token, secret_key, algorithm='HS256')
        return UserService.instance().get_one_or_fail(data['id'])
    except jwt.DecodeError:
        return False
utils.py 文件源码 项目:forest-django 作者: ForestAdmin 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def jwt_verify(view_func):
    def _wrapped_view_func(request, *args, **kwargs):
        if hasattr(request, 'META') and request.META.get('HTTP_AUTHORIZATION'):
            splitted_token = request.META['HTTP_AUTHORIZATION'].split()
            if len(splitted_token):
                token = splitted_token[1]
        try:
            payload = jwt.decode(token, settings.JWT_SECRET_KEY, True)
            return view_func(request, *args, **kwargs)
        except jwt.DecodeError as err:
            return HttpResponse(status=401)

    return _wrapped_view_func
token_verifier_filter.py 文件源码 项目:pyflock 作者: flockchat 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __call__(self, environ, start_response):
        try:
            (event_token_payload, request_body) = TokenVerifierFilter.decode_and_verify_request(environ,
                                                                                                self.app_secret,
                                                                                                self.appId)
            if event_token_payload is None or request_body is None:
                send_403_response(start_response)
            else:
                return self.app(environ, start_response)
        except jwt.DecodeError:
            send_403_response(start_response)
event_handler_client.py 文件源码 项目:pyflock 作者: flockchat 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def handle(self, environ, start_response):
        if 'event_token_payload' not in environ:
            try:
                payload, event_json = TokenVerifierFilter.decode_and_verify_request(environ,
                                                                                    self.app_secret,
                                                                                    self.app_id)
                event = Event(event_json)
            except jwt.DecodeError:
                send_403_response(start_response)
                return {}
        else:
            payload, event = environ['event_token_payload'], Event(environ['request_body'])

        if event.name == "app.install":
            return EventHandlerClient.send_response(self.on_app_install_handler, event, start_response)
        elif event.name == "app.uninstall":
            return EventHandlerClient.send_response(self.on_app_uninstall_handler, event, start_response)
        elif event.name == "chat.generateUrlPreview":
            return EventHandlerClient.send_response(self.on_chat_generate_url_preview_handler, event, start_response)
        elif event.name == "chat.receiveMessage":
            return EventHandlerClient.send_response(self.on_chat_receive_message_handler, event, start_response)
        elif event.name == "client.flockmlAction":
            return EventHandlerClient.send_response(self.on_client_flockml_action_handler, event, start_response)
        elif event.name == "client.messageAction":
            return EventHandlerClient.send_response(self.on_client_message_action_handler, event, start_response)
        elif event.name == "client.openAttachmentWidget":
            return EventHandlerClient.send_response(self.on_client_open_attachment_widget_handler, event,
                                                    start_response)
        elif event.name == "client.pressButton":
            return EventHandlerClient.send_response(self.on_client_press_button_handler, event, start_response)
        elif event.name == "client.slashCommand":
            return EventHandlerClient.send_response(self.on_client_slash_command_handler, event, start_response)
        elif event.name == "client.widgetAction":
            return EventHandlerClient.send_response(self.on_client_widget_action_handler, event, start_response)
        else:
            raise Exception("Unknown event encountered" + event.name)
controllers.py 文件源码 项目:the-catalog 作者: thurstonemerson 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not request.headers.get('Authorization'):
            response = jsonify(message='Missing authorization header')
            response.status_code = 401
            return response

        try:
            payload = parse_token(request)
        except DecodeError:
            response = jsonify(message='Token is invalid')
            response.status_code = 401
            return response
        except ExpiredSignature:
            response = jsonify(message='Token has expired')
            response.status_code = 401
            return response

        g.user_id = payload['sub']

        return f(*args, **kwargs)

    return decorated_function

# Helper functions, get currently logged in user
jwt.py 文件源码 项目:dpr-api 作者: oki-archive 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def decode(self, token):
        try:
            return jwt.decode(token,
                              self.secret,
                              algorithm=self.algorithm,
                              issuer=self.issuer)
        except jwt.ExpiredSignature:
            raise InvalidUsage("Token is expired")
        except jwt.DecodeError:
            raise InvalidUsage('Token signature is invalid')
        except Exception:
            raise Exception('Unable to parse authentication token.')
base.py 文件源码 项目:jenova 作者: inova-tecnologias 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def check_auth(self):
    auth = request.headers.get('Authorization', None)
    message = ''
    if not auth:
      abort(401, message = 'Authorization header is expected')

    parts = auth.split()

    if parts[0].lower() != 'bearer':
      message = 'Authorization header must start with Bearer'
    elif len(parts) == 1:
      message = 'Token not found'
    elif len(parts) > 2:
      message = 'Authorization header must be Bearer + \s + token'

    if message:
      abort(401, message = message)

    token = parts[1]
    try:
      payload = jwt.decode(
        token, 
        Security.get_jwt_skey(), 
        algorithms = ['HS256']
      )
    except jwt.ExpiredSignature:
      message = 'token is expired'
    except jwt.InvalidAudienceError:
      message = 'incorrect audience'
    except jwt.DecodeError:
      message = 'token signature is invalid'

    if message:
      abort(401, message = message)

    self.logger.debug('Access granted for %s!' % payload['user']['login'])

    return payload
auth.py 文件源码 项目:stethoscope 作者: Netflix 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def decode_token(self, token):
    try:
      decoded = jwt.decode(token, self.config['JWT_SECRET_KEY'],
          algorithms=[self.config['JWT_ALGORITHM']], leeway=self.config.get('JWT_LEEWAY', 0))
    except jwt.ExpiredSignatureError:
      raise werkzeug.exceptions.Unauthorized("JWT Error: Token is expired.")
    except jwt.DecodeError:
      raise werkzeug.exceptions.Unauthorized("JWT Error: Token could not be decoded.")
    except jwt.InvalidTokenError:
      raise werkzeug.exceptions.Unauthorized("JWT Error: Token is invalid.")
    return decoded
exacttarget.py 文件源码 项目:social-core 作者: python-social-auth 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def do_auth(self, token, *args, **kwargs):
        dummy, secret = self.get_key_and_secret()
        try:  # Decode the token, using the Application Signature from settings
            decoded = jwt.decode(token, secret, algorithms=['HS256'])
        except jwt.DecodeError:  # Wrong signature, fail authentication
            raise AuthCanceled(self)
        kwargs.update({'response': {'token': decoded}, 'backend': self})
        return self.strategy.authenticate(*args, **kwargs)
microsoft.py 文件源码 项目:social-core 作者: python-social-auth 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def user_data(self, access_token, *args, **kwargs):
        """Return user data by querying Microsoft service"""
        try:
            return self.get_json(
                'https://graph.microsoft.com/v1.0/me',
                headers={
                    'Content-Type': 'application/x-www-form-urlencoded',
                    'Accept': 'application/json',
                    'Authorization': 'Bearer ' + access_token
                },
                method='GET'
            )
        except (DecodeError, ExpiredSignature) as error:
            raise AuthTokenError(self, error)
azuread.py 文件源码 项目:social-core 作者: python-social-auth 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def user_data(self, access_token, *args, **kwargs):
        response = kwargs.get('response')
        id_token = response.get('id_token')
        try:
            decoded_id_token = jwt_decode(id_token, verify=False)
        except (DecodeError, ExpiredSignature) as de:
            raise AuthTokenError(self, de)
        return decoded_id_token
models.py 文件源码 项目:2017-rapdev-backend 作者: rit-sse 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def verify_auth_token(token):
        """Get the user from a JWT token."""
        try:
            decoded = jwt.decode(token, secret, algorithms=['HS256'])
        except jwt.DecodeError:
            return None
        user = User.query.get(decoded['id'])
        return user
access.py 文件源码 项目:son-cli 作者: sonata-nfv 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def check_token_status(self):
        """
        Simple request to check if session has expired (TBD)
        :return: Token status
        """

        if self.access_token is None:
            try:
                token_file = self.platform['credentials']['token_file']
                token_path = os.path.join(
                    self.workspace.workspace_root,
                    self.workspace.platforms_dir,
                    token_file)

                # Construct the POST login request
                with open(token_path, "r") as _file:
                    self.access_token = _file.read
            except:
                return True

        print('Public_key=', self.platform_public_key)

        if self.platform_public_key is None:
            return True

        # Some old PyJWT versions crash with public key binary string, instead add
        # self.platform_public_key.decode('utf-8')
        try:
            print('access_token=', self.access_token)
            print('platform_public_key=', self.platform_public_key.decode('utf-8'))
            decoded = jwt.decode(self.access_token, self.platform_public_key.decode('utf-8'),
                                 True, algorithms='RS256', audience='adapter')
            # options={'verify_aud': False})
            print('contents', decoded)
            try:
                self.username = decoded['preferred_username']
                return True
            except:
                return True
        except jwt.DecodeError:
            print('Token cannot be decoded because it failed validation')
            return False
        except jwt.ExpiredSignatureError:
            print('Signature has expired')
            return False
        except jwt.InvalidIssuerError:
            return False
        except jwt.InvalidIssuedAtError:
            return False
calls_test.py 文件源码 项目:son-cli 作者: sonata-nfv 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def check_token_status():
    """
    Simple request to check if session has expired (TBD)
    :return: Token status
    """
    import jwt

    access_token = ''
    platform_public_key = b''
    print('Public_key=', platform_public_key)

    if platform_public_key is None:
        return True

    # Some old PyJWT versions crash with public key binary string, instead add
    # self.platform_public_key.decode('utf-8')
    try:
        print('access_token=', access_token)
        print('platform_public_key=', platform_public_key.decode('utf-8'))
        decoded = jwt.decode(access_token, platform_public_key.decode('utf-8'),
                             True, algorithms='RS256', audience='adapter')
        # options={'verify_aud': False})
        print('contents', decoded)
        try:
            username = decoded['preferred_username']
            return True
        except:
            return True
    except jwt.DecodeError:
        print('Token cannot be decoded because it failed validation')
        return False
    except jwt.ExpiredSignatureError:
        print('Signature has expired')
        return False
    except jwt.InvalidIssuerError:
        return False
    except jwt.InvalidIssuedAtError:
        return False


# generate_keypair()
# token = user_login('user04', '1234')
# print(token)
# key = get_platform_public_key()
# check_token(key, token)
# sign()

# result = check_token_status()
# print("RESULT=", result)


问题


面经


文章

微信
公众号

扫码关注公众号