def before_request(self,*args):
"""Verfies that the API is Vaild for the correct user
and the IP address hasn't changed since log in"""
signer = TimestampSigner(SECRET_KEY)
api_key = request.headers['API_KEY']
Client_id = request.headers['Client_ID']
ip_address = request.remote_addr
user = User.query.filter_by(Client_id=Client_id).first()
if user == None:
return make_response(jsonify({'Failure': 'Invaild User'}), 400)
if api_key != user.api_key:
return make_response(jsonify({'Failure': 'Incorrect API Key'}), 400)
if ip_address != user.current_login_ip:
return make_response(jsonify({'Failure': 'Incorrect IP for Client, Please Re-login in'}), 400)
try:
signer.unsign(api_key, max_age=86164)
except:
return make_response(jsonify({'Failure': 'Invaild API Key, please request new API Key'}), 400)
python类TimestampSigner()的实例源码
def test_general_api_security(self):
##TODO: Add General API security test
"""This Tests the API security that is checked before all POST and GET request"""
get_return = self.app.get('/api/client_view',
headers={'API_KEY': 'api_key', 'Client_ID': 'testuser'})
assert 'Failure": "Incorrect API Key' in get_return.data
api_key = self.post_info()
get_return = self.app.get('/api/client_view',
headers={'API_KEY': api_key[0], 'Client_ID': 'fakeuser'})
assert 'Failure": "Invaild User' in get_return.data
user = User.query.filter_by(Client_id="testuser2").first()
get_return = self.app.get('/api/client_view',
headers={'API_KEY': user.api_key, 'Client_ID': 'testuser2'})
assert 'Failure": "Incorrect IP for Client, Please Re-login in' in get_return.data
signer = TimestampSigner(SECRET_KEY)
time.sleep(1)
try:
signer.unsign(api_key, max_age=1)
raise
except:
pass
def generate_token(self, user_id: str):
"""Generate a very random token tied to an user.
Parameters
----------
userid: str
User ID tied to that token
"""
user_id = str(user_id)
userid_encoded = base64.urlsafe_b64encode(user_id.encode())
raw_user = self.get_raw_user(user_id)
if raw_user is None:
raise Exception('User not found to generate a token from')
try:
pwd_hash = raw_user['password']['hash']
except:
log.debug(raw_user)
raise Exception('Raw user is not a good one')
s = TimestampSigner(pwd_hash)
return s.sign(userid_encoded).decode()
def token_find(self, token: str) -> int:
"""Return a user ID from a token.
Parses the token to get the user ID and then unsigns it
using the user's hashed password as a secret key
"""
userid_encoded = token.split('.')[0]
try:
userid = int(base64.urlsafe_b64decode(userid_encoded))
except (binascii.Error, ValueError):
return None
raw_user = self.get_raw_user(userid)
if raw_user is None:
return
s = TimestampSigner(raw_user['password']['hash'])
try:
s.unsign(token)
except itsdangerous.BadSignature:
return
return userid
def password_reset_token(user):
signer = TimestampSigner(fame_config.secret_key)
return signer.sign(str(user['_id']))
def validate_password_reset_token(token):
signer = TimestampSigner(fame_config.secret_key)
return signer.unsign(token, max_age=86400)
def create_token(self):
"""
?? token (?? uuid)
"""
from uuid import uuid1
from itsdangerous import TimestampSigner
s = TimestampSigner(self._sign_key)
return s.sign(str(uuid1()))
def check_token(self, token_sign):
"""
?? token, ?????? token
"""
from itsdangerous import TimestampSigner, SignatureExpired, BadTimeSignature
s = TimestampSigner(self._sign_key)
try:
token = s.unsign(token_sign, max_age=60) # 60???
return {'success': token}
except SignatureExpired as e:
# ??????
return {'error': e.message}
except BadTimeSignature as e:
# ??????
return {'error': e.message}
def api_key():
"""" Request url for API key for Restful API"""
if request.method == "POST":
content = request.get_json(silent=True)
user = User.query.filter_by(Client_id=content['Client_id']).first()
if not user and app.config['NO_PASSWORD'] == True:
##Allow anyone to create their own account
pass_hash = bcrypt_sha256.encrypt(content['Password'], rounds=12)
user = User(Client_id= content['Client_id'],Password= pass_hash,api_key='')
Sample_date = Client_View(client_id = content['Client_id'],
case_name= 'sample case',
priority= '1',
target_date = '10/7/2016',
product_area = 'Engineering',
status = 'In Progress',
description= 'something'
)
db.session.add(user)
db.session.commit()
db.session.add(Sample_date)
db.session.commit()
""" If user is a vaild account, proceed with verifying \
their credentials and provide them the API key"""
if user:
if bcrypt_sha256.verify(content['Password'], user.Password) and user.Client_id == content['Client_id']:
signer = TimestampSigner(SECRET_KEY)
API_KEY = ''.join([random.choice(string.ascii_letters + string.digits) for n in xrange(30)])
user.api_key = signer.sign(API_KEY)
user.current_login_ip = request.remote_addr
db.session.commit()
return make_response(jsonify({'API KEY': user.api_key}), 200)
return make_response(jsonify({'Failure': 'Incorrect Client id OR Password'}), 400)
def get_timestamp_signer():
secret_key = get_secret_key()
s = TimestampSigner(secret_key)
return s
def Index():
"""Login portal for clients"""
if current_user.is_authenticated:
"""If client is authenticated, return them back to their portal case view"""
return render_template("Client_View.html",title='client_view',
Client_id=current_user.Client_id,api_key= current_user.api_key)
form = LoginForm()
if (request.method == 'POST') and (form.validate_on_submit()):
Client_id = form.Client_id.data
Password = form.Password.data
user = User.query.filter_by(Client_id=Client_id).first()
if not user and app.config['NO_PASSWORD'] == True:
"""Allow anyone to create their own account"""
pass_hash = bcrypt_sha256.encrypt(Password, rounds=12)
user = User(Client_id= Client_id,Password= pass_hash,api_key='')
Sample_date = Client_View(client_id = Client_id,
case_name= 'sample case',
priority= '1',
target_date = '10/7/2016',
product_area = 'Engineering',
status = 'In Progress',
description= 'something'
)
db.session.add(user)
db.session.commit()
db.session.add(Sample_date)
db.session.commit()
""" If user is a vaild account, proceed with verifying \
their credentials and provide them the API key"""
if user:
if bcrypt_sha256.verify(Password, user.Password) and user.Client_id == Client_id:
signer = TimestampSigner(SECRET_KEY)
API_KEY = ''.join([random.choice(string.ascii_letters + string.digits) for n in xrange(30)])
user.api_key = signer.sign(API_KEY)
user.current_login_ip = request.remote_addr
db.session.commit()
login_user(user)
flash('You have successfully logged in')
return render_template("Client_View.html",title='client_view',
Client_id=Client_id,api_key= user.api_key)
if form.errors:
flash(form.errors, 'danger')
flash('Incorrect Credentials, please enter the correct Client Id and Password')
return render_template('Index.html', form=form)
if form.errors:
flash(form.errors, 'danger')
return render_template('Index.html', form=form)