python类encode()的实例源码

__init__.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def auth_jwt_project(short_name):
    """Create a JWT for a project via its secret KEY."""
    project_secret_key = None
    if 'Authorization' in request.headers:
        project_secret_key = request.headers.get('Authorization')
    if project_secret_key:
        project = project_repo.get_by_shortname(short_name)
        if project and project.secret_key == project_secret_key:
            token = jwt.encode({'short_name': short_name,
                                'project_id': project.id},
                               project.secret_key, algorithm='HS256')
            return token
        else:
            return abort(404)
    else:
        return abort(403)
client.py 文件源码 项目:sentry-plugins 作者: getsentry 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_jwt(self):
        exp = datetime.datetime.utcnow() + datetime.timedelta(minutes=10)
        exp = calendar.timegm(exp.timetuple())
        # Generate the JWT
        payload = {
            # issued at time
            'iat': int(time.time()),
            # JWT expiration time (10 minute maximum)
            'exp': exp,
            # Integration's GitHub identifier
            'iss': options.get('github.integration-app-id'),
        }

        return jwt.encode(
            payload, options.get('github.integration-private-key'), algorithm='RS256'
        )
models.py 文件源码 项目:flask-jwt-auth 作者: realpython 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def encode_auth_token(self, user_id):
        """
        Generates the Auth Token
        :return: string
        """
        try:
            payload = {
                'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=5),
                'iat': datetime.datetime.utcnow(),
                'sub': user_id
            }
            return jwt.encode(
                payload,
                app.config.get('SECRET_KEY'),
                algorithm='HS256'
            )
        except Exception as e:
            return e
env.py 文件源码 项目:typot 作者: chakki-works 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def make_auth_header(installation_id):
    utcnow = datetime.utcnow() + timedelta(seconds=-5)
    duration = timedelta(seconds=30)
    payload = {
        "iat": utcnow,
        "exp": utcnow + duration,
        "iss": 2510
    }
    pem = get_private_pem()
    encoded = jwt.encode(payload, pem, "RS256")
    headers = {
        "Authorization": "Bearer " + encoded.decode("utf-8"),
        "Accept": "application/vnd.github.machine-man-preview+json"
        }

    auth_url = "https://api.github.com/installations/{}/access_tokens".format(installation_id)
    r = requests.post(auth_url, headers=headers)

    if not r.ok:
        print(r.json()["message"])
        r.raise_for_status()
    token = r.json()["token"]
    return {
        "Authorization": "token {}".format(token)
    }
controllers.py 文件源码 项目:IntegraTI-API 作者: discentes-imd 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def post(self):
        """Login the user"""
        username = request.json['username']
        password = request.json['password']

        us = User.query\
            .filter(User.disabled is False)\
            .filter(User.sigaa_user_name == username)\
            .first()
        abort_if_none(us, 403, 'Username or password incorrect')

        if not check_password_hash(us.password, password):
            return msg('Username or password incorrect'), 403

        token = jwt.encode(
            {'id_user': us.id_user, 'tid': random.random()},
            config.SECRET_KEY,
            algorithm='HS256'
        ).decode('utf-8')

        return msg(token, 'token')
authorization.py 文件源码 项目:fabric8-analytics-common 作者: fabric8-analytics 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def generate_authorization_token(context, private_key):
    """Generate authorization token from the private key."""
    expiry = datetime.datetime.utcnow() + datetime.timedelta(days=90)
    userid = "testuser"

    path_to_private_key = 'data/{private_key}'.format(private_key=private_key)
    # initial value
    context.token = None

    with open(path_to_private_key) as fin:
        private_key = fin.read()

        payload = {
            'exp': expiry,
            'iat': datetime.datetime.utcnow(),
            'sub': userid
        }
        token = jwt.encode(payload, key=private_key, algorithm='RS256')
        decoded = token.decode('utf-8')
        # print(decoded)
        context.token = decoded
test_jwt_apns_client.py 文件源码 项目:jwt_apns_client 作者: Mobelux 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_make_provider_token_calls_jwt_encode_with_correct_args(self):
        """
        Test that APNSConnection.make_provider_token() calls jwt.encode() with the correct
        arguments.  jwt.encode() returns different results each time even with the same data passed in
        so we cannot just test for the expected return value.
        """
        issued_at = time.time()
        connection = jwt_apns_client.APNSConnection(
            team_id='TEAMID',
            apns_key_id='KEYID',
            apns_key_path=self.KEY_FILE_PATH)

        with mock.patch('jwt_apns_client.utils.jwt.encode') as mock_encode:
            connection.make_provider_token(issued_at=issued_at)
            mock_encode.assert_called_with(
                {
                    'iss': connection.team_id,
                    'iat': issued_at
                },
                connection.secret,
                algorithm=connection.algorithm,
                headers=connection.get_token_headers())
models.py 文件源码 项目:flask-microservices-users 作者: realpython 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def encode_auth_token(self, user_id):
        """Generates the auth token"""
        try:
            payload = {
                'exp': datetime.datetime.utcnow() + datetime.timedelta(
                    days=current_app.config.get('TOKEN_EXPIRATION_DAYS'),
                    seconds=current_app.config.get('TOKEN_EXPIRATION_SECONDS')
                ),
                'iat': datetime.datetime.utcnow(),
                'sub': user_id
            }
            return jwt.encode(
                payload,
                current_app.config.get('SECRET_KEY'),
                algorithm='HS256'
            )
        except Exception as e:
            return e
quoine.py 文件源码 项目:bitex 作者: nlsdfnbch 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def sign(self, uri, endpoint, endpoint_path, method_verb, *args, **kwargs):
        try:
            params = kwargs['params']
        except KeyError:
            params = {}

        if method_verb != 'POST':
            endpoint_path += urllib.parse.urlencode(params)
        msg = {'path': endpoint_path, 'nonce': self.nonce(), 'token_id': self.key}

        signature = jwt.encode(msg, self.secret, algorithm='HS256')
        headers = {'X-Quoine-API-Version': '2', 'X-Quoine-Auth': signature,
                   'Content-Type': 'application/json'}
        request = {'headers': headers}
        if method_verb == 'POST':
            request['json'] = params
        return self.uri + endpoint_path, request
main.py 文件源码 项目:mt2414 作者: friendsofagape 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def auth():
    email = request.form["email"]
    password = request.form["password"]
    connection = get_db()
    cursor = connection.cursor()
    cursor.execute("SELECT email FROM users WHERE  email = %s", (email,))
    est = cursor.fetchone()
    if not est:
        logging.warning('Unregistered user \'%s\' login attempt unsuccessful' % email)
        return '{"success":false, "message":"Invalid email"}'
    cursor.execute("SELECT u.password_hash, u.password_salt, r.name FROM users u LEFT JOIN roles r ON u.role_id = r.id WHERE u.email = %s and u.email_verified is True", (email,))
    rst = cursor.fetchone()
    if not rst:
        return '{"success":false, "message":"Email is not Verified"}'
    password_hash = rst[0].hex()
    password_salt = bytes.fromhex(rst[1].hex())
    password_hash_new = scrypt.hash(password, password_salt).hex()
    role = rst[2]
    if password_hash == password_hash_new:
        access_token = jwt.encode({'sub': email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1), 'role': role}, jwt_hs256_secret, algorithm='HS256')
        logging.warning('User: \'' + str(email) + '\' logged in successfully')
        return '{"access_token": "%s"}\n' % (access_token.decode('utf-8'),)
    logging.warning('User: \'' + str(email) + '\' login attempt unsuccessful: Incorrect Password')
    return '{"success":false, "message":"Incorrect Password"}'
gatewaylogin.py 文件源码 项目:API-Manager 作者: OpenBankProject 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def create_jwt(self, data):
        """
        Creates a JWT used for future requests tothe API
        data is a dict which contains keys username, secret
        """
        message = {
            'login_user_name': data['username'],
            'time_stamp': 'unused',
            'app_id': '',  # Do not create new consumer
            'app_name': '',  # Do not create new consumer
            'temenos_id': '',  # Whatever that does
        }
        if settings.GATEWAYLOGIN_HAS_CBS:
            # Not sure if that is the right thing to do
            message['is_first'] = True
        else:
            # Fake when there is no core banking system
            message.update({
                'is_first': False,
                'cbs_token': 'dummy',
            })
        token = jwt.encode(message, data['secret'], 'HS256')
        self.token = token.decode('utf-8')
        return self.token
ShortlivedTokenMixin.py 文件源码 项目:twopi-flask-utils 作者: TwoPiCode 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def dump(self, secret=None):
        """
        Dump the token into a stringified JWT.

        :param secret: The secret to sign the JWT with. If this is omitted, the 
                       secret will be sourced from ``current_app.config['SECRET_KEY']``

        :returns: The stringified JWT.
        """

        self.issued_at = datetime.datetime.now(pytz.UTC)
        payload, err = self.TokenSchema().dump(self)

        if secret is None:
            secret = current_app.config['SECRET_KEY']

        return jwt.encode(payload, secret).decode('UTF-8')
common.py 文件源码 项目:flickr_downloader 作者: Denisolt 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def generate_signed_token(private_pem, request):
    import jwt

    now = datetime.datetime.utcnow()

    claims = {
        'scope': request.scope,
        'exp': now + datetime.timedelta(seconds=request.expires_in)
    }

    claims.update(request.claims)

    token = jwt.encode(claims, private_pem, 'RS256')
    token = to_unicode(token, "UTF-8")

    return token
security.py 文件源码 项目:heroku-python-boilerplate 作者: chavli 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def generate_token(payload: dict, exp_seconds: int) -> str:
    """ generate a JWT with the given payload. uses HMAC + SHA-256 hash algorithm. the token 
    expires after the given number of seconds. """
    jwt_secret = os.getenv("JWT_SECRET")
    jwt_iss = os.getenv("JWT_ISS")

    if os.getenv("DEBUG"):
        # if running in debug mode, use timezone of machine
        payload["iat"] = int(dt.datetime.now().timestamp())
        payload["exp"] = int((dt.datetime.now() + dt.timedelta(seconds=exp_seconds)).timestamp())
    else:
        # if running in production, use UTC
        payload["iat"] = int(dt.datetime.utcnow().timestamp())
        payload["exp"] = int((dt.datetime.utcnow() + dt.timedelta(seconds=exp_seconds)).timestamp())

    payload["iss"] = jwt_iss

    try:
        token = jwt.encode(payload, jwt_secret, algorithm="HS256")
        return token.decode("utf-8")
    except Exception as e:
        raise(e)
registry.py 文件源码 项目:console 作者: laincloud 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _generate_jwt():
    try:
        now = int(time.time())
        claims["iat"] = now
        claims["nbf"] = now - 1
        claims["exp"] = now + TOKEN_EXPIRE_SECOND
        claims["jti"] = str(random.uniform(0, RANDOM_MAX_VALUE))

        dir_path = os.path.abspath(os.path.dirname(__file__))
        with open(os.path.join(dir_path, PRV_FILE), 'r') as rsa_priv_file:
            key = rsa_priv_file.read()

        encoded = jwt.encode(claims, key, algorithm=HEAD_ALG, headers=header)
        logger.debug(encoded)

        return encoded
    except Exception, e:
        logger.error("Generate JWT for registry wrong : %s" % str(e))
        raise Exception("Generate JWT for registry wrong : %s" % str(e))
common.py 文件源码 项目:nsc-cloudproject-s22016 作者: agitaretech 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def generate_signed_token(private_pem, request):
    import jwt

    now = datetime.datetime.utcnow()

    claims = {
        'scope': request.scope,
        'exp': now + datetime.timedelta(seconds=request.expires_in)
    }

    claims.update(request.claims)

    token = jwt.encode(claims, private_pem, 'RS256')
    token = to_unicode(token, "UTF-8")

    return token
common.py 文件源码 项目:nsc-cloudproject-s22016 作者: agitaretech 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def generate_signed_token(private_pem, request):
    import jwt

    now = datetime.datetime.utcnow()

    claims = {
        'scope': request.scope,
        'exp': now + datetime.timedelta(seconds=request.expires_in)
    }

    claims.update(request.claims)

    token = jwt.encode(claims, private_pem, 'RS256')
    token = to_unicode(token, "UTF-8")

    return token
lti.py 文件源码 项目:CodeGra.de 作者: CodeGra-de 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def launch_lti() -> t.Any:
    """Do a LTI launch.

    .. :quickref: LTI; Do a LTI Launch.
    """
    lti = {
        'params': CanvasLTI.create_from_request(flask.request).launch_params,
        'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=1)
    }
    return flask.redirect(
        '{}/lti_launch/?inLTI=true&jwt={}'.format(
            app.config['EXTERNAL_URL'],
            urllib.parse.quote(
                jwt.encode(
                    lti, app.config['LTI_SECRET_KEY'], algorithm='HS512'
                ).decode('utf8')
            )
        )
    )
models.py 文件源码 项目:bucket_api 作者: jokamjohn 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def encode_auth_token(self, user_id):
        """
        Encode the Auth token
        :param user_id: User's Id
        :return:
        """
        try:
            payload = {
                'exp': datetime.datetime.utcnow() + datetime.timedelta(days=app.config.get('AUTH_TOKEN_EXPIRY_DAYS'),
                                                                       seconds=app.config.get(
                                                                           'AUTH_TOKEN_EXPIRY_SECONDS')),
                'iat': datetime.datetime.utcnow(),
                'sub': user_id
            }
            return jwt.encode(
                payload,
                app.config['SECRET_KEY'],
                algorithm='HS256'
            )
        except Exception as e:
            return e
verification.py 文件源码 项目:Metis 作者: gusibi 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_token(request):
    # verify basic token
    approach = request.json.get('auth_approach')
    username = request.json['username']
    password = request.json['password']
    if approach == 'password':
        account = verify_password(username, password)
    elif approach == 'wxapp':
        account = verify_wxapp(username, password, request.args.get('code'))
    if not account:
        return False, {}
    payload = {
        "iss": Config.ISS,
        "iat": int(time.time()),
        "exp": int(time.time()) + 86400 * 7,
        "aud": Config.AUDIENCE,
        "sub": str(account.id),
        "nickname": account['nickname'],
        "scopes": ['open']
    }
    token = jwt.encode(payload, 'secret', algorithm='HS256')
    return True, {'access_token': token,
                  'account_id': str(account.id)}
test_views.py 文件源码 项目:cookiecutter-saas 作者: jayfk 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_form_valid(self, customer, sync_customer):
        customer.retrieve.return_value = {"id": "some_stripe_id"}
        sync_customer.return_value = True
        transaction_details = {
            "customer_id": "bar"
        }
        encoded = jwt.encode(transaction_details, settings.OCTOBAT_PRIVATE_KEY)
        data = {
            "transactionDetails": encoded.decode("utf-8")
        }
        self.login(username=self.user.username, password="password")
        resp = self.client.post(reverse("pinax_stripe_subscription_create"), data)
        customer.retrieve.assert_called_with("bar")
        cust = Customer.objects.get(user=self.user)
        sync_customer.assert_called_with(cust, {"id": "some_stripe_id"})
        self.assertRedirects(resp, reverse("pinax_stripe_subscription_list"))
test_views.py 文件源码 项目:cookiecutter-saas 作者: jayfk 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_customer_sync_failed(self, customer, sync_customer):
        customer.retrieve.return_value = {"id": "some_stripe_id"}
        sync_customer.side_effect = StripeError()
        transaction_details = {
            "customer_id": "bar"
        }
        encoded = jwt.encode(transaction_details, settings.OCTOBAT_PRIVATE_KEY)
        data = {
            "transactionDetails": encoded.decode("utf-8")
        }
        self.login(username=self.user.username, password="password")
        resp = self.client.post(reverse("pinax_stripe_subscription_create"), data, follow=True)
        customer.retrieve.assert_called_with("bar")
        cust = Customer.objects.get(user=self.user)
        sync_customer.assert_called_with(cust, {"id": "some_stripe_id"})
        self.assertRedirects(resp, reverse("pinax_stripe_subscription_list"))
        self.assertContains(resp, "Unable to communicate with stripe")
views.py 文件源码 项目:PmpApi 作者: pengcheng789 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def post(self, request):
        serializer = LoginSerializer(data=request.data)
        if serializer.is_valid():
            employee = Employee.objects.get(emp_id=request.data.get('emp_id'))
            encode = jwt.encode({'emp_id': employee.emp_id,
                                 'auth': employee.auth,
                                 'part_id': employee.part_id,
                                 'create_time': time(),
                                 'ip_addr': request.META.get('REMOTE_ADDR')
                                 },
                                settings.SECRET_KEY, algorithm='HS256')
            token = dict()
            token['token'] = 'JWT ' + encode
            return Response(token)

        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
common.py 文件源码 项目:python-group-proj 作者: Sharcee 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def generate_signed_token(private_pem, request):
    import jwt

    now = datetime.datetime.utcnow()

    claims = {
        'scope': request.scope,
        'exp': now + datetime.timedelta(seconds=request.expires_in)
    }

    claims.update(request.claims)

    token = jwt.encode(claims, private_pem, 'RS256')
    token = to_unicode(token, "UTF-8")

    return token
common.py 文件源码 项目:soiqbot 作者: vgan 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def generate_signed_token(private_pem, request):
    import jwt

    now = datetime.datetime.utcnow()

    claims = {
        'scope': request.scope,
        'exp': now + datetime.timedelta(seconds=request.expires_in)
    }

    claims.update(request.claims)

    token = jwt.encode(claims, private_pem, 'RS256')
    token = to_unicode(token, "UTF-8")

    return token
user.py 文件源码 项目:blog 作者: comynli 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def register(ctx, request):
    try:
        payload = request.json()
        nickname = payload['nickname']
        mail = payload['mail']
        password = payload['password']
    except KeyError as e:
        raise HTTPBadRequest('{} is required'.format(e))
    except Exception as e:
        raise HTTPBadRequest(e)
    user = User.query.filter(or_(User.nickname == nickname, User.mail == mail)).first()
    if user is not None:
        return jsonify(code=400, message='user exist')

    catalog = Catalog(name='notes')
    user = User(nickname=nickname, mail=mail, catalogs=[catalog],
                password=bcrypt.hashpw(password.encode(), bcrypt.gensalt()))
    db.session.add(user)
    try:
        db.session.commit()
        return jsonify(code=200)
    except Exception as e:
        logging.error(e)
        db.session.rollback()
        raise HTTPInternalServerError(e)
tokens.py 文件源码 项目:flask-jwt-extended 作者: vimalloc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def encode_refresh_token(identity, secret, algorithm, expires_delta, csrf,
                         identity_claim_key):
    """
    Creates a new encoded (utf-8) refresh token.

    :param identity: Some identifier used to identify the owner of this token
    :param secret: Secret key to encode the JWT with
    :param algorithm: Which algorithm to use for the toek
    :param expires_delta: How far in the future this token should expire
                          (set to False to disable expiration)
    :type expires_delta: datetime.timedelta or False
    :param csrf: Whether to include a csrf double submit claim in this token
                 (boolean)
    :param identity_claim_key: Which key should be used to store the identity
    :return: Encoded refresh token
    """
    token_data = {
        identity_claim_key: identity,
        'type': 'refresh',
    }
    if csrf:
        token_data['csrf'] = _create_csrf_token()
    return _encode_jwt(token_data, expires_delta, secret, algorithm)
jwt.py 文件源码 项目:dcos 作者: dcos 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def decode_pem_key(key_pem):
    """Convert plaintext PEM key into the format usable for JWT generation

    Args:
        key_pam (str): key data in PEM format, presented as plain string

    Returns:
        Parsed PEM data
    """
    private_key = serialization.load_pem_private_key(
        data=key_pem.encode('ascii'),
        password=None,
        backend=default_backend())

    msg = 'Unexpected private key type'
    assert isinstance(private_key, rsa.RSAPrivateKey), msg
    assert private_key.key_size >= 2048, 'RSA key size too small'

    return private_key
common.py 文件源码 项目:learneveryword 作者: karan 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def generate_signed_token(private_pem, request):
    import jwt

    now = datetime.datetime.utcnow()

    claims = {
        'scope': request.scope,
        'exp': now + datetime.timedelta(seconds=request.expires_in)
    }

    claims.update(request.claims)

    token = jwt.encode(claims, private_pem, 'RS256')
    token = to_unicode(token, "UTF-8")

    return token
session.py 文件源码 项目:paws 作者: funkybob 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __call__(self, request, *args, **kwargs):
        import jwt
        if self.cookie_name in request.cookies:
            try:
                value = request.cookies[self.cookie_name].value
                data = jwt.decode(value, self.secret)
                request.session = Session(data)
            except:
                pass
        else:
            request.session = Session()

        resp = self.func(request, *args, **kwargs)
        if not request.session.clean:
            data = dict(request.session)
            value = jwt.encode(request.session, self.secret, algorith=self.algorithm)
            resp.cookies[self.cookie_name] = value
        return resp


问题


面经


文章

微信
公众号

扫码关注公众号