python类encode()的实例源码

session.py 文件源码 项目:forest-django 作者: ForestAdmin 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def session(request):
    data = json.loads(request.body)
    allowedUsers = get_allowed_users(data.get('renderingId'))
    user = (u for u in allowedUsers if u['email'] == data['email']).next()
    if len(user) and passwords_match(data['password'], user['password']):
        payload = {
            'id': user['id'],
            'type': 'users',
            'data': {
                'email': user['email'],
                'first_name': user['first_name'],
                'last_name': user['last_name'],
                'teams': user['teams']
            },
            'exp': datetime.datetime.utcnow() + datetime.timedelta(days=14)
        }

        secret_key = settings.JWT_SECRET_KEY
        token = jwt.encode(payload, secret_key)
        return JsonResponse({ 'token': token })

    else:
        return HttpResponse(status=401)
jwt.py 文件源码 项目:contactista 作者: singingwolfboy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def refresh_jwt_token(token):
    payload = jwt.decode(
        token,
        key=current_app.config['SECRET_KEY'],
        algorithms=[JWT_ALGORITHM],
    )
    user = User.query.get(payload['identity'])
    if not user.active:
        raise ValueError("User is inactive")
    orig_iat = payload.get('orig_iat')
    if not orig_iat:
        raise ValueError("`orig_iat` field is required")
    refresh_limit = orig_iat + int(JWT_REFRESH_EXPIRATION_DELTA.total_seconds())
    now_ts = datetime.datetime.utcnow().timestamp()
    if now_ts > refresh_limit:
        raise ValueError("Refresh has expired")
    new_payload = jwt_payload(user)
    new_payload["orig_iat"] = orig_iat
    token = jwt.encode(
        new_payload,
        key=current_app.config['SECRET_KEY'],
        algorithm=JWT_ALGORITHM,
    )
    return token, user
home.py 文件源码 项目:tokendealer 作者: Runnerly 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def create_token():
    key = current_app.config['priv_key']
    try:
        data = request.form
        if data.get('grant_type') != 'client_credentials':
            return _400('Wrong grant_type')

        client_id = data.get('client_id')
        client_secret = data.get('client_secret')
        aud = data.get('audience', '')

        if not is_authorized_app(client_id, client_secret):
            return abort(401)

        now = int(time.time())

        token = {'iss': 'https://tokendealer.example.com',
                 'aud': aud,
                 'iat': now,
                 'exp': now + 3600 * 24}
        token = jwt.encode(token, key, algorithm='RS512')
        return {'access_token': token.decode('utf8')}
    except Exception as e:
        return _400(str(e))
MainClass.py 文件源码 项目:TutLab 作者: KingsMentor 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def create_jwt(self):
        """
        Creates a signed JWT, valid for 60 seconds.
        :return:
        """
        now = int(time.time())
        payload = {
            "iat": now,
            "exp": now + 60,
            "iss": self.integration_id
        }
        return jwt.encode(
            payload,
            key=self.private_key,
            algorithm="RS256"
        )
backends.py 文件源码 项目:falcon-auth 作者: loanzen 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_auth_token(self, user_payload):
        """
        Create a JWT authentication token from ``user_payload``

        Args:
            user_payload(dict, required): A `dict` containing required information
                to create authentication token
        """
        now = datetime.utcnow()
        payload = {
            'user': user_payload
        }
        if 'iat' in self.verify_claims:
            payload['iat'] = now

        if 'nbf' in self.verify_claims:
            payload['nbf'] = now + self.leeway

        if 'exp' in self.verify_claims:
            payload['exp'] = now + self.expiration_delta

        return jwt.encode(payload, self.secret_key,
                          json_encoder=ExtendedJSONEncoder).decode('utf-8')
backends.py 文件源码 项目:falcon-auth 作者: loanzen 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_auth_token(self, user_payload):
        """
        Extracts username, password from the `user_payload` and encode the
        credentials `username:password` in `base64` form
        """
        username = user_payload.get('username') or None
        password = user_payload.get('password') or None

        if not username or not password:
            raise ValueError('`user_payload` must contain both username and password')

        token = '{username}:{password}'.format(
            username=username, password=password).encode('utf-8')

        token_b64 = base64.b64encode(token).decode('utf-8', 'ignore')

        return '{auth_header_prefix} {token_b64}'.format(
            auth_header_prefix=self.auth_header_prefix, token_b64=token_b64)
jwt_utils.py 文件源码 项目:wiggum 作者: qdqmedia 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_jwt(user, secret=None, algorithm=None, expiration_time=None,
               extra_data={}):

    if not secret:
        secret = settings.JWT_SECRET_KEY
    if not algorithm:
        algorithm = settings.JWT_SIGN_ALGORITHM
    if not expiration_time:
        expiration_time = settings.JWT_EXPIRATION_TIME_DELTA

    payload = create_jwt_payload(user=user,
                                 expiration_delta=expiration_time,
                                 issuer=settings.JWT_ISSUER,
                                 **extra_data)

    return jwt.encode(payload, secret, algorithm=algorithm)
user.py 文件源码 项目:jenova 作者: inova-tecnologias 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def post(self):
    parser = reqparse.RequestParser()
    parser.add_argument('username', type=str, required=True)
    parser.add_argument('password', type=str, required=True)
    reqdata = parser.parse_args(strict=True)

    user = User.query.filter_by(login = reqdata['username']).first()
    if not user:
      abort(401, message = 'Wrong credentials')

    if not Security.check_password(user.password, reqdata['password']):
      abort(401, message = 'Wrong credentials')
    user.scopes = Scope.query.all()

    enc_jwt = jwt.encode({'user' : UserSchema().dump(user).data}, Security.get_jwt_skey(), algorithm='HS256')

    return { 
      'response' : { 
        'token' : enc_jwt
      }
    }
helpers.py 文件源码 项目:bernard 作者: BernardFW 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _make_url(self, url: Text, extra: Dict, request: 'Request') -> Text:
        """
        Compute an URL that will go through the redirection system that allows
        the trigger of the `LinkClick` layer.
        """

        real_url = patch_qs(url, extra)

        if self.slug:
            url = urljoin(request.message.get_url_base(), '/links/facebook')
            url = patch_qs(url, {
                'l': jwt.encode(
                    {
                        'u': request.user.fbid,
                        'p': request.user.page_id,
                        'h': real_url,
                        's': self.slug,
                    },
                    settings.WEBVIEW_SECRET_KEY,
                    algorithm=settings.WEBVIEW_JWT_ALGORITHM,
                )
            })
            return url

        return real_url
protocol.py 文件源码 项目:inmanta 作者: inmanta 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def custom_json_encoder(o):
    """
        A custom json encoder that knows how to encode other types commonly used by Inmanta
    """
    if isinstance(o, uuid.UUID):
        return str(o)

    if isinstance(o, datetime):
        return o.isoformat()

    if hasattr(o, "to_dict"):
        return o.to_dict()

    if isinstance(o, enum.Enum):
        return o.name

    if isinstance(o, Exception):
        # Logs can push exceptions through RPC. Return a string representation.
        return str(o)

    if isinstance(o, execute.util.Unknown):
        return const.UKNOWN_STRING

    LOGGER.error("Unable to serialize %s", o)
    raise TypeError(repr(o) + " is not JSON serializable")
protocol.py 文件源码 项目:inmanta 作者: inmanta 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def encode_token(client_types, environment=None, idempotent=False, expire=None):
    cfg = inmanta_config.AuthJWTConfig.get_sign_config()

    payload = {
        "iss": cfg.issuer,
        "aud": [cfg.audience],
        const.INMANTA_URN + "ct": ",".join(client_types),
    }

    if not idempotent:
        payload["iat"] = int(time.time())

        if cfg.expire > 0:
            payload["exp"] = int(time.time() + cfg.expire)
        elif expire is not None:
            payload["exp"] = int(time.time() + expire)

    if environment is not None:
        payload[const.INMANTA_URN + "env"] = environment

    return jwt.encode(payload, cfg.key, cfg.algo).decode()
views.py 文件源码 项目:authserver 作者: jdelic 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _create_jwt(self, claim: Dict[str, Any],  key_pemstr: str) -> str:
        _log.debug("Encoding JWT response: %s", claim)

        fp = base64.b32encode(
            SHA256.new(
                data=RSA.importKey(key_pemstr).publickey().exportKey(format="DER")
            ).digest()[0:30]  # shorten to 240 bit presumably so no padding is necessary
        ).decode('utf-8')

        kid = ":".join([fp[i:i + 4] for i in range(0, len(fp), 4)])

        jwtstr = jwt.encode(
            claim,
            headers={
                "typ": "JWT",
                "alg": "RS256",
                "kid": kid,
            },
            key=key_pemstr,
            algorithm="RS256",
        ).decode('utf-8')

        _log.debug("JWT response: %s", jwtstr)
        return jwtstr
credentials.py 文件源码 项目:PyAPNs2 作者: Pr0Ger 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _get_or_create_topic_token(self, topic):
        # dict of topic to issue date and JWT token
        token_pair = self.__topicTokens.get(topic)
        if token_pair is None or self._is_expired_token(token_pair[0]):
            # Create a new token
            issued_at = time.time()
            token_dict = {
                'iss': self.__team_id,
                'iat': issued_at,
            }
            headers = {
                'alg': self.__encryption_algorithm,
                'kid': self.__auth_key_id,
            }
            jwt_token = jwt.encode(token_dict, self.__auth_key,
                                   algorithm=self.__encryption_algorithm,
                                   headers=headers).decode('ascii')

            self.__topicTokens[topic] = (issued_at, jwt_token)
            return jwt_token
        else:
            return token_pair[1]
util.py 文件源码 项目:assistant-server 作者: sharpdeep 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def gen_syllabus_post_data(start_year=str(datetime.now().year),semester=Semester.AUTUMN.value):
    data =('__EVENTTARGET=&__EVENTARGUMENT=' \
              '&__VIEWSTATE=%2FwEPDwUKLTc4MzA3NjE3Mg9kFgICAQ9kFgYCAQ9kFgRmDxAPFgIeBFRleHQFDzIwMTUtMjAxNuWtpuW5tGQQFQcPMjAxMi0yMDEz5a2m5bm0DzIwMTMtMjAxNOWtpuW5tA8yMDE0LTIwMTXlrablubQPMjAxNS0yMDE25a2m5bm0DzIwMTYtMjAxN%2BWtpuW5tA8yMDE3LTIwMTjlrablubQPMjAxOC0yMDE55a2m5bm0FQc' \
              'PMjAxMi0yMDEz5a2m5bm0DzIwMTMtMjAxNOWtpuW5tA8yMDE0LTIwMTXlrablubQPMjAxNS0yMDE25a2m' \
              '5bm0DzIwMTYtMjAxN%2BWtpuW5tA8yMDE3LTIwMTjlrablubQPMjAxOC0yMDE55a2m' \
              '5bm0FCsDB2dnZ2dnZ2cWAGQCAQ8QZGQWAQICZAIFDxQrAAsP' \
              'FggeCERhdGFLZXlzFgAeC18hSXRlbUNvdW50Zh4JUGFnZUNvdW50Ag' \
              'EeFV8hRGF0YVNvdXJjZUl0ZW1Db3VudGZkZBYEHghDc3NDbGFzcwUMREdQYWdlclN0eWxlHgRfIVN' \
              'CAgIWBB8FBQ1ER0hlYWRlclN0eWxlHwYCAhYEHwUFDURHRm9vdGVyU3R5bGUfBgICFgQfBQULREdJdGVtU3R5bGUfBgICFgQfBQUWREdBbHRlcm5hdGluZ0l0ZW1TdHlsZR8GAgIWBB8FBRNER1NlbGVjdGVkSXRlbVN0eWxlHwYCAhYEHwUFD0RHRWRpdEl0ZW1TdHlsZR8GAgIWBB8FBQJERx8GAgJkFgJmD2QWAgIBD2QWBAIDDw8WAh8ABQ3lhbEw6Zeo6K' \
              '%2B%2B56iLZGQCBA8PFgIfAAUHMOWtpuWIhmRkAgYPFCsACw8WAh4HVmlzaWJsZWhkZBYEHwUFDERHUGFnZXJTdHlsZR8GAgIWBB8FBQ1ER0hlYWRlclN0eWxlHwYCAhYEHwUFDURHRm9vdGVyU3R5bGUfBgICFgQfBQULREdJdGVtU3R5bGUfBgICFgQfBQUWREdBbHRlcm5hdGluZ0l0ZW1TdHlsZR8GAgIWBB8FBRNER1NlbGVjdGVkSXRlbVN0eWxlHwYCAhYEHwUFD0RHRWRpdEl0ZW1TdHlsZR8GAgIWBB8FBQJERx8GAgJkZBgBBR5fX0NvbnRyb2xzUmVxdWlyZVBvc3RCYWNrS2V5X18WAgUIdWNzWVMkWE4FCWJ0blNlYXJjaIOA1hvkaeyr6hBNdPilFTCCDv8u' \
              '&__VIEWSTATEGENERATOR=E672B8C6&__EVENTVALIDATION=%2FwEWDgKP7Z%2FyDQKA2K7WDwKl3bLICQKs3faYCwKj3ZrWCALF5PiNDALE5IzLDQLD5MCbDwKv3YqZDQL7tY9IAvi1j0gC' \
              '%2BrWPSAL63YQMAqWf8%2B4KZKbR9XsGkypHcOunFkHTcdqR6to%3D&' \
              'ucsYS%24XN%24Text={}-{}%' \
              'D1%A7%C4%EA&ucsYS%24XQ=' + str(semester) + '&ucsYS%24hfXN=&btnSearch.x=42&btnSearch.y=21').format(str(start_year), str(int(start_year)+1))

    return data.encode('utf-8')
utils.py 文件源码 项目:MeeseeksDev 作者: MeeseeksBox 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _integration_authenticated_request(self, method, url):
        self.since= int(datetime.datetime.now().timestamp())
        payload = dict({
          'iat': self.since,
          'exp': self.since + self.duration,
          'iss': self.integration_id,
        })

        tok = jwt.encode(payload, key=self.rsadata, algorithm='RS256')

        headers = {'Authorization': 'Bearer {}'.format(tok.decode()),
                   'Accept': ACCEPT_HEADER,
                   'Host': 'api.github.com',
                   'User-Agent': 'python/requests'}
        req = requests.Request(method, url, headers=headers)
        prepared = req.prepare()
        with requests.Session() as s:
            return s.send(prepared)
ilovepdf.py 文件源码 项目:ilovepdf 作者: sdelquin 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, public_key, secret_key, verbose=False):
        """
        You must register in order to get the keys.
        public_key: It can be obtained from
                    https://developer.ilovepdf.com/user/projects
                    You can see it as "project key" or "jti claim"
        secret_key: It can be obtained from
                    https://developer.ilovepdf.com/user/projects
        """
        logging.config.fileConfig("logging.cfg")
        self.logger = logging.getLogger()
        self.verbose = verbose
        if self.verbose:
            self.logger.info("Generating the encoded signed public key...")
        self.public_key = public_key
        self.secret_key = secret_key
        signed_public_key = jwt.encode(
            {"jti": self.public_key},
            self.secret_key,
            algorithm="HS256"
        ).decode("utf-8")
        self.headers = {"Authorization": "Bearer {}".format(signed_public_key)}
models.py 文件源码 项目:flask-rest-api 作者: gitgik 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def generate_token(self, user_id):
        """Generates the access token to be used as the Authorization header"""

        try:
            # set up a payload with an expiration time
            payload = {
                'exp': datetime.utcnow() + timedelta(minutes=5),
                'iat': datetime.utcnow(),
                'sub': user_id
            }
            # create the byte string token using the payload and the SECRET key
            jwt_string = jwt.encode(
                payload,
                current_app.config.get('SECRET'),
                algorithm='HS256'
            )
            return jwt_string

        except Exception as e:
            # return an error in string format if an exception occurs
            return str(e)


问题


面经


文章

微信
公众号

扫码关注公众号