def session(request):
data = json.loads(request.body)
allowedUsers = get_allowed_users(data.get('renderingId'))
user = (u for u in allowedUsers if u['email'] == data['email']).next()
if len(user) and passwords_match(data['password'], user['password']):
payload = {
'id': user['id'],
'type': 'users',
'data': {
'email': user['email'],
'first_name': user['first_name'],
'last_name': user['last_name'],
'teams': user['teams']
},
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=14)
}
secret_key = settings.JWT_SECRET_KEY
token = jwt.encode(payload, secret_key)
return JsonResponse({ 'token': token })
else:
return HttpResponse(status=401)
python类encode()的实例源码
def refresh_jwt_token(token):
payload = jwt.decode(
token,
key=current_app.config['SECRET_KEY'],
algorithms=[JWT_ALGORITHM],
)
user = User.query.get(payload['identity'])
if not user.active:
raise ValueError("User is inactive")
orig_iat = payload.get('orig_iat')
if not orig_iat:
raise ValueError("`orig_iat` field is required")
refresh_limit = orig_iat + int(JWT_REFRESH_EXPIRATION_DELTA.total_seconds())
now_ts = datetime.datetime.utcnow().timestamp()
if now_ts > refresh_limit:
raise ValueError("Refresh has expired")
new_payload = jwt_payload(user)
new_payload["orig_iat"] = orig_iat
token = jwt.encode(
new_payload,
key=current_app.config['SECRET_KEY'],
algorithm=JWT_ALGORITHM,
)
return token, user
def create_token():
key = current_app.config['priv_key']
try:
data = request.form
if data.get('grant_type') != 'client_credentials':
return _400('Wrong grant_type')
client_id = data.get('client_id')
client_secret = data.get('client_secret')
aud = data.get('audience', '')
if not is_authorized_app(client_id, client_secret):
return abort(401)
now = int(time.time())
token = {'iss': 'https://tokendealer.example.com',
'aud': aud,
'iat': now,
'exp': now + 3600 * 24}
token = jwt.encode(token, key, algorithm='RS512')
return {'access_token': token.decode('utf8')}
except Exception as e:
return _400(str(e))
def create_jwt(self):
"""
Creates a signed JWT, valid for 60 seconds.
:return:
"""
now = int(time.time())
payload = {
"iat": now,
"exp": now + 60,
"iss": self.integration_id
}
return jwt.encode(
payload,
key=self.private_key,
algorithm="RS256"
)
def get_auth_token(self, user_payload):
"""
Create a JWT authentication token from ``user_payload``
Args:
user_payload(dict, required): A `dict` containing required information
to create authentication token
"""
now = datetime.utcnow()
payload = {
'user': user_payload
}
if 'iat' in self.verify_claims:
payload['iat'] = now
if 'nbf' in self.verify_claims:
payload['nbf'] = now + self.leeway
if 'exp' in self.verify_claims:
payload['exp'] = now + self.expiration_delta
return jwt.encode(payload, self.secret_key,
json_encoder=ExtendedJSONEncoder).decode('utf-8')
def get_auth_token(self, user_payload):
"""
Extracts username, password from the `user_payload` and encode the
credentials `username:password` in `base64` form
"""
username = user_payload.get('username') or None
password = user_payload.get('password') or None
if not username or not password:
raise ValueError('`user_payload` must contain both username and password')
token = '{username}:{password}'.format(
username=username, password=password).encode('utf-8')
token_b64 = base64.b64encode(token).decode('utf-8', 'ignore')
return '{auth_header_prefix} {token_b64}'.format(
auth_header_prefix=self.auth_header_prefix, token_b64=token_b64)
def create_jwt(user, secret=None, algorithm=None, expiration_time=None,
extra_data={}):
if not secret:
secret = settings.JWT_SECRET_KEY
if not algorithm:
algorithm = settings.JWT_SIGN_ALGORITHM
if not expiration_time:
expiration_time = settings.JWT_EXPIRATION_TIME_DELTA
payload = create_jwt_payload(user=user,
expiration_delta=expiration_time,
issuer=settings.JWT_ISSUER,
**extra_data)
return jwt.encode(payload, secret, algorithm=algorithm)
def post(self):
parser = reqparse.RequestParser()
parser.add_argument('username', type=str, required=True)
parser.add_argument('password', type=str, required=True)
reqdata = parser.parse_args(strict=True)
user = User.query.filter_by(login = reqdata['username']).first()
if not user:
abort(401, message = 'Wrong credentials')
if not Security.check_password(user.password, reqdata['password']):
abort(401, message = 'Wrong credentials')
user.scopes = Scope.query.all()
enc_jwt = jwt.encode({'user' : UserSchema().dump(user).data}, Security.get_jwt_skey(), algorithm='HS256')
return {
'response' : {
'token' : enc_jwt
}
}
def _make_url(self, url: Text, extra: Dict, request: 'Request') -> Text:
"""
Compute an URL that will go through the redirection system that allows
the trigger of the `LinkClick` layer.
"""
real_url = patch_qs(url, extra)
if self.slug:
url = urljoin(request.message.get_url_base(), '/links/facebook')
url = patch_qs(url, {
'l': jwt.encode(
{
'u': request.user.fbid,
'p': request.user.page_id,
'h': real_url,
's': self.slug,
},
settings.WEBVIEW_SECRET_KEY,
algorithm=settings.WEBVIEW_JWT_ALGORITHM,
)
})
return url
return real_url
def custom_json_encoder(o):
"""
A custom json encoder that knows how to encode other types commonly used by Inmanta
"""
if isinstance(o, uuid.UUID):
return str(o)
if isinstance(o, datetime):
return o.isoformat()
if hasattr(o, "to_dict"):
return o.to_dict()
if isinstance(o, enum.Enum):
return o.name
if isinstance(o, Exception):
# Logs can push exceptions through RPC. Return a string representation.
return str(o)
if isinstance(o, execute.util.Unknown):
return const.UKNOWN_STRING
LOGGER.error("Unable to serialize %s", o)
raise TypeError(repr(o) + " is not JSON serializable")
def encode_token(client_types, environment=None, idempotent=False, expire=None):
cfg = inmanta_config.AuthJWTConfig.get_sign_config()
payload = {
"iss": cfg.issuer,
"aud": [cfg.audience],
const.INMANTA_URN + "ct": ",".join(client_types),
}
if not idempotent:
payload["iat"] = int(time.time())
if cfg.expire > 0:
payload["exp"] = int(time.time() + cfg.expire)
elif expire is not None:
payload["exp"] = int(time.time() + expire)
if environment is not None:
payload[const.INMANTA_URN + "env"] = environment
return jwt.encode(payload, cfg.key, cfg.algo).decode()
def _create_jwt(self, claim: Dict[str, Any], key_pemstr: str) -> str:
_log.debug("Encoding JWT response: %s", claim)
fp = base64.b32encode(
SHA256.new(
data=RSA.importKey(key_pemstr).publickey().exportKey(format="DER")
).digest()[0:30] # shorten to 240 bit presumably so no padding is necessary
).decode('utf-8')
kid = ":".join([fp[i:i + 4] for i in range(0, len(fp), 4)])
jwtstr = jwt.encode(
claim,
headers={
"typ": "JWT",
"alg": "RS256",
"kid": kid,
},
key=key_pemstr,
algorithm="RS256",
).decode('utf-8')
_log.debug("JWT response: %s", jwtstr)
return jwtstr
def _get_or_create_topic_token(self, topic):
# dict of topic to issue date and JWT token
token_pair = self.__topicTokens.get(topic)
if token_pair is None or self._is_expired_token(token_pair[0]):
# Create a new token
issued_at = time.time()
token_dict = {
'iss': self.__team_id,
'iat': issued_at,
}
headers = {
'alg': self.__encryption_algorithm,
'kid': self.__auth_key_id,
}
jwt_token = jwt.encode(token_dict, self.__auth_key,
algorithm=self.__encryption_algorithm,
headers=headers).decode('ascii')
self.__topicTokens[topic] = (issued_at, jwt_token)
return jwt_token
else:
return token_pair[1]
def gen_syllabus_post_data(start_year=str(datetime.now().year),semester=Semester.AUTUMN.value):
data =('__EVENTTARGET=&__EVENTARGUMENT=' \
'&__VIEWSTATE=%2FwEPDwUKLTc4MzA3NjE3Mg9kFgICAQ9kFgYCAQ9kFgRmDxAPFgIeBFRleHQFDzIwMTUtMjAxNuWtpuW5tGQQFQcPMjAxMi0yMDEz5a2m5bm0DzIwMTMtMjAxNOWtpuW5tA8yMDE0LTIwMTXlrablubQPMjAxNS0yMDE25a2m5bm0DzIwMTYtMjAxN%2BWtpuW5tA8yMDE3LTIwMTjlrablubQPMjAxOC0yMDE55a2m5bm0FQc' \
'PMjAxMi0yMDEz5a2m5bm0DzIwMTMtMjAxNOWtpuW5tA8yMDE0LTIwMTXlrablubQPMjAxNS0yMDE25a2m' \
'5bm0DzIwMTYtMjAxN%2BWtpuW5tA8yMDE3LTIwMTjlrablubQPMjAxOC0yMDE55a2m' \
'5bm0FCsDB2dnZ2dnZ2cWAGQCAQ8QZGQWAQICZAIFDxQrAAsP' \
'FggeCERhdGFLZXlzFgAeC18hSXRlbUNvdW50Zh4JUGFnZUNvdW50Ag' \
'EeFV8hRGF0YVNvdXJjZUl0ZW1Db3VudGZkZBYEHghDc3NDbGFzcwUMREdQYWdlclN0eWxlHgRfIVN' \
'CAgIWBB8FBQ1ER0hlYWRlclN0eWxlHwYCAhYEHwUFDURHRm9vdGVyU3R5bGUfBgICFgQfBQULREdJdGVtU3R5bGUfBgICFgQfBQUWREdBbHRlcm5hdGluZ0l0ZW1TdHlsZR8GAgIWBB8FBRNER1NlbGVjdGVkSXRlbVN0eWxlHwYCAhYEHwUFD0RHRWRpdEl0ZW1TdHlsZR8GAgIWBB8FBQJERx8GAgJkFgJmD2QWAgIBD2QWBAIDDw8WAh8ABQ3lhbEw6Zeo6K' \
'%2B%2B56iLZGQCBA8PFgIfAAUHMOWtpuWIhmRkAgYPFCsACw8WAh4HVmlzaWJsZWhkZBYEHwUFDERHUGFnZXJTdHlsZR8GAgIWBB8FBQ1ER0hlYWRlclN0eWxlHwYCAhYEHwUFDURHRm9vdGVyU3R5bGUfBgICFgQfBQULREdJdGVtU3R5bGUfBgICFgQfBQUWREdBbHRlcm5hdGluZ0l0ZW1TdHlsZR8GAgIWBB8FBRNER1NlbGVjdGVkSXRlbVN0eWxlHwYCAhYEHwUFD0RHRWRpdEl0ZW1TdHlsZR8GAgIWBB8FBQJERx8GAgJkZBgBBR5fX0NvbnRyb2xzUmVxdWlyZVBvc3RCYWNrS2V5X18WAgUIdWNzWVMkWE4FCWJ0blNlYXJjaIOA1hvkaeyr6hBNdPilFTCCDv8u' \
'&__VIEWSTATEGENERATOR=E672B8C6&__EVENTVALIDATION=%2FwEWDgKP7Z%2FyDQKA2K7WDwKl3bLICQKs3faYCwKj3ZrWCALF5PiNDALE5IzLDQLD5MCbDwKv3YqZDQL7tY9IAvi1j0gC' \
'%2BrWPSAL63YQMAqWf8%2B4KZKbR9XsGkypHcOunFkHTcdqR6to%3D&' \
'ucsYS%24XN%24Text={}-{}%' \
'D1%A7%C4%EA&ucsYS%24XQ=' + str(semester) + '&ucsYS%24hfXN=&btnSearch.x=42&btnSearch.y=21').format(str(start_year), str(int(start_year)+1))
return data.encode('utf-8')
def _integration_authenticated_request(self, method, url):
self.since= int(datetime.datetime.now().timestamp())
payload = dict({
'iat': self.since,
'exp': self.since + self.duration,
'iss': self.integration_id,
})
tok = jwt.encode(payload, key=self.rsadata, algorithm='RS256')
headers = {'Authorization': 'Bearer {}'.format(tok.decode()),
'Accept': ACCEPT_HEADER,
'Host': 'api.github.com',
'User-Agent': 'python/requests'}
req = requests.Request(method, url, headers=headers)
prepared = req.prepare()
with requests.Session() as s:
return s.send(prepared)
def __init__(self, public_key, secret_key, verbose=False):
"""
You must register in order to get the keys.
public_key: It can be obtained from
https://developer.ilovepdf.com/user/projects
You can see it as "project key" or "jti claim"
secret_key: It can be obtained from
https://developer.ilovepdf.com/user/projects
"""
logging.config.fileConfig("logging.cfg")
self.logger = logging.getLogger()
self.verbose = verbose
if self.verbose:
self.logger.info("Generating the encoded signed public key...")
self.public_key = public_key
self.secret_key = secret_key
signed_public_key = jwt.encode(
{"jti": self.public_key},
self.secret_key,
algorithm="HS256"
).decode("utf-8")
self.headers = {"Authorization": "Bearer {}".format(signed_public_key)}
def generate_token(self, user_id):
"""Generates the access token to be used as the Authorization header"""
try:
# set up a payload with an expiration time
payload = {
'exp': datetime.utcnow() + timedelta(minutes=5),
'iat': datetime.utcnow(),
'sub': user_id
}
# create the byte string token using the payload and the SECRET key
jwt_string = jwt.encode(
payload,
current_app.config.get('SECRET'),
algorithm='HS256'
)
return jwt_string
except Exception as e:
# return an error in string format if an exception occurs
return str(e)