python类decode()的实例源码

main.py 文件源码 项目:mt2414 作者: friendsofagape 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def auth():
    email = request.form["email"]
    password = request.form["password"]
    connection = get_db()
    cursor = connection.cursor()
    cursor.execute("SELECT email FROM users WHERE  email = %s", (email,))
    est = cursor.fetchone()
    if not est:
        logging.warning('Unregistered user \'%s\' login attempt unsuccessful' % email)
        return '{"success":false, "message":"Invalid email"}'
    cursor.execute("SELECT u.password_hash, u.password_salt, r.name FROM users u LEFT JOIN roles r ON u.role_id = r.id WHERE u.email = %s and u.email_verified is True", (email,))
    rst = cursor.fetchone()
    if not rst:
        return '{"success":false, "message":"Email is not Verified"}'
    password_hash = rst[0].hex()
    password_salt = bytes.fromhex(rst[1].hex())
    password_hash_new = scrypt.hash(password, password_salt).hex()
    role = rst[2]
    if password_hash == password_hash_new:
        access_token = jwt.encode({'sub': email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1), 'role': role}, jwt_hs256_secret, algorithm='HS256')
        logging.warning('User: \'' + str(email) + '\' logged in successfully')
        return '{"access_token": "%s"}\n' % (access_token.decode('utf-8'),)
    logging.warning('User: \'' + str(email) + '\' login attempt unsuccessful: Incorrect Password')
    return '{"success":false, "message":"Incorrect Password"}'
models.py 文件源码 项目:flask-jwt-auth 作者: realpython 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def decode_auth_token(auth_token):
        """
        Validates the auth token
        :param auth_token:
        :return: integer|string
        """
        try:
            payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'))
            is_blacklisted_token = BlacklistToken.check_blacklist(auth_token)
            if is_blacklisted_token:
                return 'Token blacklisted. Please log in again.'
            else:
                return payload['sub']
        except jwt.ExpiredSignatureError:
            return 'Signature expired. Please log in again.'
        except jwt.InvalidTokenError:
            return 'Invalid token. Please log in again.'
ShortlivedTokenMixin.py 文件源码 项目:twopi-flask-utils 作者: TwoPiCode 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def dump(self, secret=None):
        """
        Dump the token into a stringified JWT.

        :param secret: The secret to sign the JWT with. If this is omitted, the 
                       secret will be sourced from ``current_app.config['SECRET_KEY']``

        :returns: The stringified JWT.
        """

        self.issued_at = datetime.datetime.now(pytz.UTC)
        payload, err = self.TokenSchema().dump(self)

        if secret is None:
            secret = current_app.config['SECRET_KEY']

        return jwt.encode(payload, secret).decode('UTF-8')
security.py 文件源码 项目:heroku-python-boilerplate 作者: chavli 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def generate_token(payload: dict, exp_seconds: int) -> str:
    """ generate a JWT with the given payload. uses HMAC + SHA-256 hash algorithm. the token 
    expires after the given number of seconds. """
    jwt_secret = os.getenv("JWT_SECRET")
    jwt_iss = os.getenv("JWT_ISS")

    if os.getenv("DEBUG"):
        # if running in debug mode, use timezone of machine
        payload["iat"] = int(dt.datetime.now().timestamp())
        payload["exp"] = int((dt.datetime.now() + dt.timedelta(seconds=exp_seconds)).timestamp())
    else:
        # if running in production, use UTC
        payload["iat"] = int(dt.datetime.utcnow().timestamp())
        payload["exp"] = int((dt.datetime.utcnow() + dt.timedelta(seconds=exp_seconds)).timestamp())

    payload["iss"] = jwt_iss

    try:
        token = jwt.encode(payload, jwt_secret, algorithm="HS256")
        return token.decode("utf-8")
    except Exception as e:
        raise(e)
lti.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def launch_lti() -> t.Any:
    """Do a LTI launch.

    .. :quickref: LTI; Do a LTI Launch.
    """
    lti = {
        'params': CanvasLTI.create_from_request(flask.request).launch_params,
        'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=1)
    }
    return flask.redirect(
        '{}/lti_launch/?inLTI=true&jwt={}'.format(
            app.config['EXTERNAL_URL'],
            urllib.parse.quote(
                jwt.encode(
                    lti, app.config['LTI_SECRET_KEY'], algorithm='HS512'
                ).decode('utf8')
            )
        )
    )
twitter.py 文件源码 项目:socialauth 作者: emilyhorsman 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def decode_oauth_token_secret(self):
        if not self.token_cookie:
            raise Error('No token cookie given')

        try:
            payload = jwt.decode(self.token_cookie,
                                 self.token_secret,
                                 algorithm = 'HS256')
        except:
            raise Error('Failed to retrieve oauth_token_secret from token')

        self.oauth_token_secret = payload.get('data', {}).get('id', None)
        if not self.oauth_token_secret:
            raise Error('Token does not have an oauth_token_secret')

        return self.oauth_token_secret
server.py 文件源码 项目:flask_auth_token 作者: stoic1979 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = request.args.get('token')

        # ensure that token is specified in the request
        if not token:
            return jsonify({'message': 'Missing token!'})

        # ensure that token is valid
        try:
            data = jwt.decode(token, app.config['secretkey'])
        except:
            return jsonify({'message': 'Invalid token!'})

        return f(*args, **kwargs)

    return decorated
main.py 文件源码 项目:microAuth 作者: Gingernaut 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def reset(token):
    try:
        token = token.replace("*", ".")
        tokenData = jwt.decode(str(token), utils.JWT_SECRET, algorithms=[utils.JWT_ALGORITHM])
        accId = int(tokenData["userId"])

        accData = accFunctions.getAccData(accId)
        accData["authToken"] =  accFunctions.genToken(accId)

        return custResponse(200, "Resetting Account", accData)

    except Exception as e:
        if app.config["DEBUG"] == True:
            print("*-*-*-*")
            print(e)
            return custResponse(400,{"Err": str(e)})
        else:
            return custResponse(400, "Error resetting account.")
main.py 文件源码 项目:microAuth 作者: Gingernaut 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def confirm(token):
    try:
        token = token.replace("*", ".")
        tokenData = jwt.decode(str(token), utils.JWT_SECRET, algorithms=[utils.JWT_ALGORITHM])
        accId = int(tokenData["userId"])
        payload = accFunctions.cleanPayload(accId, {"isValidated": True})

        if "Error" in payload:
            return custResponse(payload["errStat"], payload["Error"])

        accFunctions.updateAccount(payload)
        accData = accFunctions.getAccData(accId)
        return custResponse(200, "Successfully validated account.", accData)

    except Exception as e:
        if app.config["DEBUG"] == True:
            print("*-*-*-*")
            print(e)
            return custResponse(400,{"Err": str(e)})
        else:
            return custResponse(400, "Error validating account.")
models.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def for_request(self, request, body=None):
        if body and 'oauth_client_id' in body:
            rv = Tenant.objects.get(pk=body['oauth_client_id'])
            if rv is not None:
                return rv, {}

        jwt_data = request.GET.get('signed_request')

        if not jwt_data:
            header = request.META.get('HTTP_AUTHORIZATION', '')
            jwt_data = header[4:] if header.startswith('JWT ') else None

        if not jwt_data:
            raise BadTenantError('Could not find JWT')

        try:
            oauth_id = jwt.decode(jwt_data, verify=False)['iss']
            client = Tenant.objects.get(pk=oauth_id)
            if client is not None:
                data = jwt.decode(jwt_data, client.secret)
                return client, data
        except jwt.exceptions.DecodeError:
            pass

        raise BadTenantError('Could not find tenant')
controllers.py 文件源码 项目:specstore 作者: datahq 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _verify(auth_token, owner, public_key):
    """Verify Auth Token.
    :param auth_token: Authentication token to verify
    :param owner: dataset owner
    """
    if not auth_token or not owner:
        return False
    try:
        token = jwt.decode(auth_token.encode('ascii'),
                           public_key,
                           algorithm='RS256')
        # TODO: check service in the future
        has_permission = True
        # has_permission = token.get('permissions', {}) \
        #     .get('datapackage-upload', False)
        # service = token.get('service')
        # has_permission = has_permission and service == 'os.datastore'
        has_permission = has_permission and owner == token.get('userid')
        return has_permission
    except jwt.InvalidTokenError:
        return False
__init__.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def jwt_authorize_project(project, payload):
    """Authorize the project for the payload."""
    try:
        if payload is None:
            return handle_error(INVALID_HEADER_MISSING)
        parts = payload.split()

        if parts[0].lower() != 'bearer':
            return handle_error(INVALID_HEADER_BEARER)
        elif len(parts) == 1:
            return handle_error(INVALID_HEADER_TOKEN)
        elif len(parts) > 2:
            return handle_error(INVALID_HEADER_BEARER_TOKEN)

        data = jwt.decode(parts[1],
                          project.secret_key,
                          'H256')
        if (data['project_id'] == project.id
            and data['short_name'] == project.short_name):
            return True
        else:
            return handle_error(WRONG_PROJECT_SIGNATURE)
    except exceptions.DecodeError:
        return handle_error(DECODE_ERROR_SIGNATURE)
test_jwt_apns_client.py 文件源码 项目:jwt_apns_client 作者: Mobelux 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_make_provider_token_encodes_correctly(self):
        """
        Run the token returned by make_provider_token back through jwt.decode() and verify that the expected
        payload is there.
        """
        issued_at = time.time()
        connection = jwt_apns_client.APNSConnection(
            team_id='TEAMID',
            apns_key_id='KEYID',
            apns_key_path=self.KEY_FILE_PATH)
        token = connection.make_provider_token(issued_at=issued_at)
        options = {
            'verify_signature': False,
            'verify_exp': False,
            'verify_nbf': False,
            'verify_iat': False,
            'verify_aud': False
        }
        decoded = jwt.decode(token,
                             connection.secret,
                             algorithm=connection.algorithm,
                             headers=connection.get_token_headers(),
                             options=options)
        self.assertEqual(decoded, {'iat': issued_at, 'iss': 'TEAMID'})
systesthelper.py 文件源码 项目:drift 作者: dgnorth 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def auth_service(self):
        """
        Authenticate as a service user
        """
        payload = {
            "username": service_username,
            "password": service_password,
            "provider": "user+pass"
        }
        resp = self.post("/auth", data=payload)
        token = resp.json()["token"]
        jti = resp.json()["jti"]
        self.token = token
        self.jti = jti
        self.current_user = jwt.decode(self.token, verify=False)
        self.player_id = self.current_user["player_id"]
        self.user_id = self.current_user["user_id"]
        self.headers = {"Authorization": "JWT " + token, }

        r = self.get("/")
        self.endpoints = r.json()["endpoints"]
services.py 文件源码 项目:bitstore 作者: datahq 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def verify(auth_token, owner):
    """Verify Auth Token.
    :param auth_token: Authentication token to verify
    :param owner: dataset owner
    """
    if not auth_token:
        return False
    if auth_token == 'testing-token' and owner == '__tests':
        return True
    try:
        token = jwt.decode(auth_token.encode('ascii'),
                           public_key(),
                           algorithm='RS256')
        has_permission = owner == token.get('userid')
        # TODO: Check service in the future
        # service = token.get('service')
        # has_permission = has_permission and service == 'world'
        # has_permission = has_permission and owner == token.get('userid')
        return has_permission
    except jwt.InvalidTokenError:
        return False
services.py 文件源码 项目:bitstore 作者: datahq 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_user_id(auth_token):
    """Returns the user id from an Auth Token.
    :param auth_token: Authentication token to verify
    :returns user id
    """
    if not auth_token:
        return None
    try:
        token = jwt.decode(auth_token.encode('ascii'),
                           public_key(),
                           algorithm='RS256')
        # TODO: Check service in the future
        # service = token.get('service')
        # if service == 'world':
        return token.get('userid')
    except jwt.InvalidTokenError:
        pass
    return None
GeneralController.py 文件源码 项目:cerberus-core 作者: ovh 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def logout(request):
    """ Logout a user
    """
    try:
        token = request.environ['HTTP_X_API_TOKEN']
    except (KeyError, IndexError, TypeError):
        raise BadRequest('Missing HTTP X-Api-Token header')

    try:
        data = jwt.decode(token, settings.SECRET_KEY)
        data = json.loads(CRYPTO.decrypt(str(data['data'])))
        user = User.objects.get(id=data['id'])
        user.last_login = datetime.fromtimestamp(0)
        user.save()
        return {'message': 'Logged out'}
    except (utils.CryptoException, KeyError, jwt.DecodeError,
            jwt.ExpiredSignature, User.DoesNotExist):
        raise BadRequest('Invalid token')
validators.py 文件源码 项目:plone.server 作者: plone 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def validate(self, token):
        if token.get('type') != 'bearer':
            return

        if '.' not in token.get('token', ''):
            # quick way to check if actually might be jwt
            return

        try:
            validated_jwt = jwt.decode(
                token['token'],
                app_settings['jwt']['secret'],
                algorithms=[app_settings['jwt']['algorithm']])
            token['id'] = validated_jwt['id']
            user = await find_user(self.request, token)
            if user is not None and user.id == token['id']:
                return user
        except jwt.exceptions.DecodeError:
            pass
        except jwt.exceptions.ExpiredSignatureError:
            pass

        return
tenant_manager.py 文件源码 项目:os-reststack-manager 作者: gemagomez 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def authenticate():
    # logger.debug("endpoint request: %s" % request.endpoint)
    if re.search('tenant_provisioned', str(request.endpoint)):
        g.user = "phone_home"
        logger.info("Authentication bypassed: tenant_provisioned")
        return

    try:
        decoded = jwt.decode(request.headers['X-Auth-Token'], credentials['tenant_secret'], algorithms=['HS256'])
        g.user = decoded['user']
    except KeyError:
        logger.error("Error: key error.")
        abort(401)
    except jwt.DecodeError:
        logger.error("Error: decode error")
        abort(401)
__init__.py 文件源码 项目:archivertools 作者: datatogether 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __getJWT(self):
        """
        requests JWT from Data Together API. Requires that the environment variable MORPH_DT_API_KEY is set
        """
        try:
            api_key = os.environ['MORPH_DT_API_KEY']
        except KeyError:
            raise KeyError('Data Together API Key not set. Set the environment variable MORPH_DT_API_KEY '
                    'to your Data Together API Key. For instructions on how to do this in morph.io, see '
                    'https://morph.io/documentation/secret_values')
        h = {'access_token':api_key}
        token = requests.post('https://ident.archivers.space/jwt', headers=h).content
        pubkey = requests.get('https://ident.archivers.space/publickey').content
        try:
            jwt.decode(token, pubkey, algorithms=['RS256'])
        except jwt.exceptions.DecodeError as E:
            logging.error('Could not verify Data Together signature on JWT')
            raise
        return token
validators.py 文件源码 项目:guillotina 作者: plone 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def validate(self, token):
        if token.get('type') not in ('bearer', 'wstoken'):
            return

        if '.' not in token.get('token', ''):
            # quick way to check if actually might be jwt
            return

        try:
            validated_jwt = jwt.decode(
                token['token'],
                app_settings['jwt']['secret'],
                algorithms=[app_settings['jwt']['algorithm']])
            token['id'] = validated_jwt['id']
            user = await find_user(self.request, token)
            if user is not None and user.id == token['id']:
                return user
        except (jwt.exceptions.DecodeError, jwt.exceptions.ExpiredSignatureError,
                KeyError):
            pass

        return
test_token.py 文件源码 项目:auth-srv 作者: openpermissions 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_encode_token(encode):
    token, expiry = generate_token(CLIENT, SCOPE, 'grant_type')

    decoded = jwt.decode(token, verify=False)
    expected_expiry = calendar.timegm(NOW.timetuple()) + EXPIRY * 60
    data = {
        'exp': expected_expiry,
        'iss': 'localhost:8006/token',
        'aud': 'localhost:8006/verify',
        'sub': CLIENT.id,
        'client': {
            'id': CLIENT.id,
            'service_type': CLIENT.service_type,
            'organisation_id': CLIENT.organisation_id,
        },
        'scope': SCOPE,
        'grant_type': 'grant_type',
        'delegate': False
    }

    assert decoded == data
    assert encode.call_count == 1
    assert encode.call_args_list[0][-1] == {'algorithm': 'RS256'}
    assert expiry == expected_expiry
lti.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def second_phase_lti_launch(
) -> helpers.JSONResponse[t.Mapping[str, t.Union[str, models.Assignment, bool]]
                          ]:
    launch_params = jwt.decode(
        flask.request.headers.get('Jwt', None),
        app.config['LTI_SECRET_KEY'],
        algorithm='HS512'
    )['params']
    lti = CanvasLTI(launch_params)

    user, new_token = lti.ensure_lti_user()
    course = lti.get_course()
    assig = lti.get_assignment(user)
    lti.set_user_role(user)
    new_role_created = lti.set_user_course_role(user, course)
    db.session.commit()

    result: t.Mapping[str, t.Union[str, models.Assignment, bool]]
    result = {'assignment': assig, 'new_role_created': new_role_created}
    if new_token is not None:
        result['access_token'] = new_token

    return helpers.jsonify(result)
server.py 文件源码 项目:Genomes 作者: ThunderousFigs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def getRelatives():
    decoded = jwt.decode(request.cookies.get('token'), SECRET_KEY, algorithms=['HS256'])
    current_user_profile_id = decoded['user_profile_id']

    #Retrieve all relatives from database, not filtered by user
    #To Do: Filter this by user
    user_relatives = models.db_session.query(models.user_relatives).all()
    user_relatives_ids = []
    #Iterate through all relatives
    for user_relative in user_relatives:
        user = list(user_relative)
        #For each relative, grab only those that match on the current_user_profile_id
        if current_user_profile_id == str(user[0]):
            user_relatives_ids.append(int(user[1]))
    #Retrieve all relatives from DB
    #To Do: is this the same information in the user_relatives variable above?
    relatives = models.db_session.query(models.Relative).all()
    finalRelatives = []
    #Iterate through all relatives
    for relative in relatives:
        #Grab only relatives who match the relatives in the user_relatives_ids storage
        if relative.serialize()['id'] in user_relatives_ids:
            finalRelatives.append(relative.serialize())

    return jsonify({'relativeList' : finalRelatives})
server.py 文件源码 项目:Genomes 作者: ThunderousFigs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def getSnps():

    decoded = jwt.decode(request.cookies.get('token'), app.config.get('SECRET_KEY'), algorithms=['HS256'])
    current_user_profile_id = decoded['user_profile_id']

    user_snps = {}

    user_data = models.db_session.query(models.User).filter(models.User.profile_id == current_user_profile_id).first().serialize()
    for user_datum in user_data:
        if user_datum[:2:].lower()=='rs':
            user_snps[user_datum] = user_data[user_datum]

    user_outcomes = []
    for user_snp in user_snps:
        # loop through entire snp table, if any of snp base pairs match up to the base pair in user snps, put in an object with rsid and outcome
        current_snp = models.db_session.query(models.Snp).filter(models.Snp.rs_id == user_snp).filter(models.Snp.dnaPair == user_snps[user_snp]).first()

        if current_snp is not None:
            user_outcomes.append({"title": current_snp.serialize()["title"], "rsid": user_snp, "pair": user_snps[user_snp], "outcome": current_snp.serialize()['outcome'], "video": current_snp.serialize()['video']});

    return jsonify({'outcomes': user_outcomes})
tests.py 文件源码 项目:socialauth 作者: emilyhorsman 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_flow(self):
        # First leg
        res = self.get_provider({ 'login': 'start' })
        self.assertEqual(res['status'], 302)
        self.assertIn('/oauth/authenticate?oauth_token', res.get('redirect'))

        payload = jwt.decode(res['set_token_cookie'], 'sekret', algorithm = 'HS256')
        self.assertEqual(payload['data']['type'], 'oauth_token_secret')

        # Second leg
        args = {
            'oauth_token': 'Z6eEdO8MOmk394WozF5oKyuAv855l4Mlqo7hhlSLik',
            'oauth_verifier': 'zvq3SztKJphiXzEbUrzt3E7n8WmhZVsx'
        }

        res = self.get_provider(args, res['set_token_cookie'])

        self.assertEqual(res['status'], 200)
        self.assertEqual(res['provider_user_id'], '987')
        self.assertEqual(res['provider_user_name'], 'test')
models.py 文件源码 项目:bucket_api 作者: jokamjohn 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def decode_auth_token(token):
        """
        Decoding the token to get the payload and then return the user Id in 'sub'
        :param token: Auth Token
        :return:
        """
        try:
            payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms='HS256')
            is_token_blacklisted = BlackListToken.check_blacklist(token)
            if is_token_blacklisted:
                return 'Token was Blacklisted, Please login In'
            return payload['sub']
        except jwt.ExpiredSignatureError:
            return 'Signature expired, Please sign in again'
        except jwt.InvalidTokenError:
            return 'Invalid token. Please sign in again'
jwtcat.py 文件源码 项目:jwtcat 作者: AresS31 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(token, word):
    """ Check if [word] can decrypt [token]
    """
    try:
        payload = jwt.decode(token, word, algorithm = 'HS256')
        return True

    except jwt.exceptions.InvalidTokenError:
        logger.debug("InvalidTokenError: {}".format(word))
        return False
    except jwt.exceptions.DecodeError:
        logger.debug("DecodingError: {}".format(word))
        return False
    except Exception as ex:
        logger.exception("Exception: {}".format(ex))
        sys.exit(1)
canister.py 文件源码 项目:canister 作者: dagnelies 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _buildAuthBasic(config):
    username = config.get('canister.auth_basic_username', None)
    password = config.get('canister.auth_basic_password', None)
    encryption = config.get('canister.auth_basic_encryption', 'clear').lower() # clear or sha256

    if not username or not password:
        return None

    def validate(token):
        user, pwd = base64.b64decode(token).decode('utf-8').split(':', 1)
        if user != username:
            return None
        elif encryption == 'clear' and password == pwd:
            return user
        elif encryption == 'sha256' and password == hashlib.sha256(pwd):
            return user
        else:
            return None

    return validate
canister.py 文件源码 项目:canister 作者: dagnelies 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _buildAuthJWT(config):
    client_id = config.get('canister.auth_jwt_client_id', None)
    secret = config.get('canister.auth_jwt_secret', None)
    encoding = config.get('canister.auth_jwt_encoding', 'clear').lower() # clear, base64std, or base64url

    if not client_id or not secret:
        return None

    import jwt

    if encoding == 'base64std': # with + and /
        secret = base64.standard_b64decode(secret)
    elif encoding == 'base64url': # with - and _
        secret = base64.urlsafe_b64decode(secret)
    elif encoding == 'clear':
        pass
    else:
        raise Exception('Invalid auth_jwt_encoding in config: "%s" (should be "clear", "base64std" or "base64url")' % encoding)

    def validate(token):
        profile = jwt.decode(token, secret, audience=client_id)
        return profile

    return validate


问题


面经


文章

微信
公众号

扫码关注公众号