def auth_jwt_project(short_name):
"""Create a JWT for a project via its secret KEY."""
project_secret_key = None
if 'Authorization' in request.headers:
project_secret_key = request.headers.get('Authorization')
if project_secret_key:
project = project_repo.get_by_shortname(short_name)
if project and project.secret_key == project_secret_key:
token = jwt.encode({'short_name': short_name,
'project_id': project.id},
project.secret_key, algorithm='HS256')
return token
else:
return abort(404)
else:
return abort(403)
python类encode()的实例源码
def get_jwt(self):
exp = datetime.datetime.utcnow() + datetime.timedelta(minutes=10)
exp = calendar.timegm(exp.timetuple())
# Generate the JWT
payload = {
# issued at time
'iat': int(time.time()),
# JWT expiration time (10 minute maximum)
'exp': exp,
# Integration's GitHub identifier
'iss': options.get('github.integration-app-id'),
}
return jwt.encode(
payload, options.get('github.integration-private-key'), algorithm='RS256'
)
def encode_auth_token(self, user_id):
"""
Generates the Auth Token
:return: string
"""
try:
payload = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=5),
'iat': datetime.datetime.utcnow(),
'sub': user_id
}
return jwt.encode(
payload,
app.config.get('SECRET_KEY'),
algorithm='HS256'
)
except Exception as e:
return e
def make_auth_header(installation_id):
utcnow = datetime.utcnow() + timedelta(seconds=-5)
duration = timedelta(seconds=30)
payload = {
"iat": utcnow,
"exp": utcnow + duration,
"iss": 2510
}
pem = get_private_pem()
encoded = jwt.encode(payload, pem, "RS256")
headers = {
"Authorization": "Bearer " + encoded.decode("utf-8"),
"Accept": "application/vnd.github.machine-man-preview+json"
}
auth_url = "https://api.github.com/installations/{}/access_tokens".format(installation_id)
r = requests.post(auth_url, headers=headers)
if not r.ok:
print(r.json()["message"])
r.raise_for_status()
token = r.json()["token"]
return {
"Authorization": "token {}".format(token)
}
def post(self):
"""Login the user"""
username = request.json['username']
password = request.json['password']
us = User.query\
.filter(User.disabled is False)\
.filter(User.sigaa_user_name == username)\
.first()
abort_if_none(us, 403, 'Username or password incorrect')
if not check_password_hash(us.password, password):
return msg('Username or password incorrect'), 403
token = jwt.encode(
{'id_user': us.id_user, 'tid': random.random()},
config.SECRET_KEY,
algorithm='HS256'
).decode('utf-8')
return msg(token, 'token')
authorization.py 文件源码
项目:fabric8-analytics-common
作者: fabric8-analytics
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def generate_authorization_token(context, private_key):
"""Generate authorization token from the private key."""
expiry = datetime.datetime.utcnow() + datetime.timedelta(days=90)
userid = "testuser"
path_to_private_key = 'data/{private_key}'.format(private_key=private_key)
# initial value
context.token = None
with open(path_to_private_key) as fin:
private_key = fin.read()
payload = {
'exp': expiry,
'iat': datetime.datetime.utcnow(),
'sub': userid
}
token = jwt.encode(payload, key=private_key, algorithm='RS256')
decoded = token.decode('utf-8')
# print(decoded)
context.token = decoded
def test_make_provider_token_calls_jwt_encode_with_correct_args(self):
"""
Test that APNSConnection.make_provider_token() calls jwt.encode() with the correct
arguments. jwt.encode() returns different results each time even with the same data passed in
so we cannot just test for the expected return value.
"""
issued_at = time.time()
connection = jwt_apns_client.APNSConnection(
team_id='TEAMID',
apns_key_id='KEYID',
apns_key_path=self.KEY_FILE_PATH)
with mock.patch('jwt_apns_client.utils.jwt.encode') as mock_encode:
connection.make_provider_token(issued_at=issued_at)
mock_encode.assert_called_with(
{
'iss': connection.team_id,
'iat': issued_at
},
connection.secret,
algorithm=connection.algorithm,
headers=connection.get_token_headers())
def encode_auth_token(self, user_id):
"""Generates the auth token"""
try:
payload = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(
days=current_app.config.get('TOKEN_EXPIRATION_DAYS'),
seconds=current_app.config.get('TOKEN_EXPIRATION_SECONDS')
),
'iat': datetime.datetime.utcnow(),
'sub': user_id
}
return jwt.encode(
payload,
current_app.config.get('SECRET_KEY'),
algorithm='HS256'
)
except Exception as e:
return e
def sign(self, uri, endpoint, endpoint_path, method_verb, *args, **kwargs):
try:
params = kwargs['params']
except KeyError:
params = {}
if method_verb != 'POST':
endpoint_path += urllib.parse.urlencode(params)
msg = {'path': endpoint_path, 'nonce': self.nonce(), 'token_id': self.key}
signature = jwt.encode(msg, self.secret, algorithm='HS256')
headers = {'X-Quoine-API-Version': '2', 'X-Quoine-Auth': signature,
'Content-Type': 'application/json'}
request = {'headers': headers}
if method_verb == 'POST':
request['json'] = params
return self.uri + endpoint_path, request
def auth():
email = request.form["email"]
password = request.form["password"]
connection = get_db()
cursor = connection.cursor()
cursor.execute("SELECT email FROM users WHERE email = %s", (email,))
est = cursor.fetchone()
if not est:
logging.warning('Unregistered user \'%s\' login attempt unsuccessful' % email)
return '{"success":false, "message":"Invalid email"}'
cursor.execute("SELECT u.password_hash, u.password_salt, r.name FROM users u LEFT JOIN roles r ON u.role_id = r.id WHERE u.email = %s and u.email_verified is True", (email,))
rst = cursor.fetchone()
if not rst:
return '{"success":false, "message":"Email is not Verified"}'
password_hash = rst[0].hex()
password_salt = bytes.fromhex(rst[1].hex())
password_hash_new = scrypt.hash(password, password_salt).hex()
role = rst[2]
if password_hash == password_hash_new:
access_token = jwt.encode({'sub': email, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1), 'role': role}, jwt_hs256_secret, algorithm='HS256')
logging.warning('User: \'' + str(email) + '\' logged in successfully')
return '{"access_token": "%s"}\n' % (access_token.decode('utf-8'),)
logging.warning('User: \'' + str(email) + '\' login attempt unsuccessful: Incorrect Password')
return '{"success":false, "message":"Incorrect Password"}'
def create_jwt(self, data):
"""
Creates a JWT used for future requests tothe API
data is a dict which contains keys username, secret
"""
message = {
'login_user_name': data['username'],
'time_stamp': 'unused',
'app_id': '', # Do not create new consumer
'app_name': '', # Do not create new consumer
'temenos_id': '', # Whatever that does
}
if settings.GATEWAYLOGIN_HAS_CBS:
# Not sure if that is the right thing to do
message['is_first'] = True
else:
# Fake when there is no core banking system
message.update({
'is_first': False,
'cbs_token': 'dummy',
})
token = jwt.encode(message, data['secret'], 'HS256')
self.token = token.decode('utf-8')
return self.token
def dump(self, secret=None):
"""
Dump the token into a stringified JWT.
:param secret: The secret to sign the JWT with. If this is omitted, the
secret will be sourced from ``current_app.config['SECRET_KEY']``
:returns: The stringified JWT.
"""
self.issued_at = datetime.datetime.now(pytz.UTC)
payload, err = self.TokenSchema().dump(self)
if secret is None:
secret = current_app.config['SECRET_KEY']
return jwt.encode(payload, secret).decode('UTF-8')
def generate_signed_token(private_pem, request):
import jwt
now = datetime.datetime.utcnow()
claims = {
'scope': request.scope,
'exp': now + datetime.timedelta(seconds=request.expires_in)
}
claims.update(request.claims)
token = jwt.encode(claims, private_pem, 'RS256')
token = to_unicode(token, "UTF-8")
return token
def generate_token(payload: dict, exp_seconds: int) -> str:
""" generate a JWT with the given payload. uses HMAC + SHA-256 hash algorithm. the token
expires after the given number of seconds. """
jwt_secret = os.getenv("JWT_SECRET")
jwt_iss = os.getenv("JWT_ISS")
if os.getenv("DEBUG"):
# if running in debug mode, use timezone of machine
payload["iat"] = int(dt.datetime.now().timestamp())
payload["exp"] = int((dt.datetime.now() + dt.timedelta(seconds=exp_seconds)).timestamp())
else:
# if running in production, use UTC
payload["iat"] = int(dt.datetime.utcnow().timestamp())
payload["exp"] = int((dt.datetime.utcnow() + dt.timedelta(seconds=exp_seconds)).timestamp())
payload["iss"] = jwt_iss
try:
token = jwt.encode(payload, jwt_secret, algorithm="HS256")
return token.decode("utf-8")
except Exception as e:
raise(e)
def _generate_jwt():
try:
now = int(time.time())
claims["iat"] = now
claims["nbf"] = now - 1
claims["exp"] = now + TOKEN_EXPIRE_SECOND
claims["jti"] = str(random.uniform(0, RANDOM_MAX_VALUE))
dir_path = os.path.abspath(os.path.dirname(__file__))
with open(os.path.join(dir_path, PRV_FILE), 'r') as rsa_priv_file:
key = rsa_priv_file.read()
encoded = jwt.encode(claims, key, algorithm=HEAD_ALG, headers=header)
logger.debug(encoded)
return encoded
except Exception, e:
logger.error("Generate JWT for registry wrong : %s" % str(e))
raise Exception("Generate JWT for registry wrong : %s" % str(e))
def generate_signed_token(private_pem, request):
import jwt
now = datetime.datetime.utcnow()
claims = {
'scope': request.scope,
'exp': now + datetime.timedelta(seconds=request.expires_in)
}
claims.update(request.claims)
token = jwt.encode(claims, private_pem, 'RS256')
token = to_unicode(token, "UTF-8")
return token
def generate_signed_token(private_pem, request):
import jwt
now = datetime.datetime.utcnow()
claims = {
'scope': request.scope,
'exp': now + datetime.timedelta(seconds=request.expires_in)
}
claims.update(request.claims)
token = jwt.encode(claims, private_pem, 'RS256')
token = to_unicode(token, "UTF-8")
return token
def launch_lti() -> t.Any:
"""Do a LTI launch.
.. :quickref: LTI; Do a LTI Launch.
"""
lti = {
'params': CanvasLTI.create_from_request(flask.request).launch_params,
'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=1)
}
return flask.redirect(
'{}/lti_launch/?inLTI=true&jwt={}'.format(
app.config['EXTERNAL_URL'],
urllib.parse.quote(
jwt.encode(
lti, app.config['LTI_SECRET_KEY'], algorithm='HS512'
).decode('utf8')
)
)
)
def encode_auth_token(self, user_id):
"""
Encode the Auth token
:param user_id: User's Id
:return:
"""
try:
payload = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=app.config.get('AUTH_TOKEN_EXPIRY_DAYS'),
seconds=app.config.get(
'AUTH_TOKEN_EXPIRY_SECONDS')),
'iat': datetime.datetime.utcnow(),
'sub': user_id
}
return jwt.encode(
payload,
app.config['SECRET_KEY'],
algorithm='HS256'
)
except Exception as e:
return e
def create_token(request):
# verify basic token
approach = request.json.get('auth_approach')
username = request.json['username']
password = request.json['password']
if approach == 'password':
account = verify_password(username, password)
elif approach == 'wxapp':
account = verify_wxapp(username, password, request.args.get('code'))
if not account:
return False, {}
payload = {
"iss": Config.ISS,
"iat": int(time.time()),
"exp": int(time.time()) + 86400 * 7,
"aud": Config.AUDIENCE,
"sub": str(account.id),
"nickname": account['nickname'],
"scopes": ['open']
}
token = jwt.encode(payload, 'secret', algorithm='HS256')
return True, {'access_token': token,
'account_id': str(account.id)}
def test_form_valid(self, customer, sync_customer):
customer.retrieve.return_value = {"id": "some_stripe_id"}
sync_customer.return_value = True
transaction_details = {
"customer_id": "bar"
}
encoded = jwt.encode(transaction_details, settings.OCTOBAT_PRIVATE_KEY)
data = {
"transactionDetails": encoded.decode("utf-8")
}
self.login(username=self.user.username, password="password")
resp = self.client.post(reverse("pinax_stripe_subscription_create"), data)
customer.retrieve.assert_called_with("bar")
cust = Customer.objects.get(user=self.user)
sync_customer.assert_called_with(cust, {"id": "some_stripe_id"})
self.assertRedirects(resp, reverse("pinax_stripe_subscription_list"))
def test_customer_sync_failed(self, customer, sync_customer):
customer.retrieve.return_value = {"id": "some_stripe_id"}
sync_customer.side_effect = StripeError()
transaction_details = {
"customer_id": "bar"
}
encoded = jwt.encode(transaction_details, settings.OCTOBAT_PRIVATE_KEY)
data = {
"transactionDetails": encoded.decode("utf-8")
}
self.login(username=self.user.username, password="password")
resp = self.client.post(reverse("pinax_stripe_subscription_create"), data, follow=True)
customer.retrieve.assert_called_with("bar")
cust = Customer.objects.get(user=self.user)
sync_customer.assert_called_with(cust, {"id": "some_stripe_id"})
self.assertRedirects(resp, reverse("pinax_stripe_subscription_list"))
self.assertContains(resp, "Unable to communicate with stripe")
def post(self, request):
serializer = LoginSerializer(data=request.data)
if serializer.is_valid():
employee = Employee.objects.get(emp_id=request.data.get('emp_id'))
encode = jwt.encode({'emp_id': employee.emp_id,
'auth': employee.auth,
'part_id': employee.part_id,
'create_time': time(),
'ip_addr': request.META.get('REMOTE_ADDR')
},
settings.SECRET_KEY, algorithm='HS256')
token = dict()
token['token'] = 'JWT ' + encode
return Response(token)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def generate_signed_token(private_pem, request):
import jwt
now = datetime.datetime.utcnow()
claims = {
'scope': request.scope,
'exp': now + datetime.timedelta(seconds=request.expires_in)
}
claims.update(request.claims)
token = jwt.encode(claims, private_pem, 'RS256')
token = to_unicode(token, "UTF-8")
return token
def generate_signed_token(private_pem, request):
import jwt
now = datetime.datetime.utcnow()
claims = {
'scope': request.scope,
'exp': now + datetime.timedelta(seconds=request.expires_in)
}
claims.update(request.claims)
token = jwt.encode(claims, private_pem, 'RS256')
token = to_unicode(token, "UTF-8")
return token
def register(ctx, request):
try:
payload = request.json()
nickname = payload['nickname']
mail = payload['mail']
password = payload['password']
except KeyError as e:
raise HTTPBadRequest('{} is required'.format(e))
except Exception as e:
raise HTTPBadRequest(e)
user = User.query.filter(or_(User.nickname == nickname, User.mail == mail)).first()
if user is not None:
return jsonify(code=400, message='user exist')
catalog = Catalog(name='notes')
user = User(nickname=nickname, mail=mail, catalogs=[catalog],
password=bcrypt.hashpw(password.encode(), bcrypt.gensalt()))
db.session.add(user)
try:
db.session.commit()
return jsonify(code=200)
except Exception as e:
logging.error(e)
db.session.rollback()
raise HTTPInternalServerError(e)
def encode_refresh_token(identity, secret, algorithm, expires_delta, csrf,
identity_claim_key):
"""
Creates a new encoded (utf-8) refresh token.
:param identity: Some identifier used to identify the owner of this token
:param secret: Secret key to encode the JWT with
:param algorithm: Which algorithm to use for the toek
:param expires_delta: How far in the future this token should expire
(set to False to disable expiration)
:type expires_delta: datetime.timedelta or False
:param csrf: Whether to include a csrf double submit claim in this token
(boolean)
:param identity_claim_key: Which key should be used to store the identity
:return: Encoded refresh token
"""
token_data = {
identity_claim_key: identity,
'type': 'refresh',
}
if csrf:
token_data['csrf'] = _create_csrf_token()
return _encode_jwt(token_data, expires_delta, secret, algorithm)
def decode_pem_key(key_pem):
"""Convert plaintext PEM key into the format usable for JWT generation
Args:
key_pam (str): key data in PEM format, presented as plain string
Returns:
Parsed PEM data
"""
private_key = serialization.load_pem_private_key(
data=key_pem.encode('ascii'),
password=None,
backend=default_backend())
msg = 'Unexpected private key type'
assert isinstance(private_key, rsa.RSAPrivateKey), msg
assert private_key.key_size >= 2048, 'RSA key size too small'
return private_key
def generate_signed_token(private_pem, request):
import jwt
now = datetime.datetime.utcnow()
claims = {
'scope': request.scope,
'exp': now + datetime.timedelta(seconds=request.expires_in)
}
claims.update(request.claims)
token = jwt.encode(claims, private_pem, 'RS256')
token = to_unicode(token, "UTF-8")
return token
def __call__(self, request, *args, **kwargs):
import jwt
if self.cookie_name in request.cookies:
try:
value = request.cookies[self.cookie_name].value
data = jwt.decode(value, self.secret)
request.session = Session(data)
except:
pass
else:
request.session = Session()
resp = self.func(request, *args, **kwargs)
if not request.session.clean:
data = dict(request.session)
value = jwt.encode(request.session, self.secret, algorith=self.algorithm)
resp.cookies[self.cookie_name] = value
return resp