def generate_csrf(secret_key=None, time_limit=None):
"""Generate csrf token code.
:param secret_key: A secret key for mixing in the token,
default is Flask.secret_key.
:param time_limit: Token valid in the time limit,
default is 3600s.
"""
if not secret_key:
secret_key = current_app.config.get(
'WTF_CSRF_SECRET_KEY', current_app.secret_key
)
if not secret_key:
raise Exception('Must provide secret_key to use csrf.')
if time_limit is None:
time_limit = current_app.config.get('WTF_CSRF_TIME_LIMIT', 3600)
if 'csrf_token' not in session:
session['csrf_token'] = hashlib.sha1(os.urandom(64)).hexdigest()
if time_limit:
expires = time.time() + time_limit
csrf_build = '%s%s' % (session['csrf_token'], expires)
else:
expires = ''
csrf_build = session['csrf_token']
hmac_csrf = hmac.new(
to_bytes(secret_key),
to_bytes(csrf_build),
digestmod=hashlib.sha1
).hexdigest()
return '%s##%s' % (expires, hmac_csrf)
python类secret_key()的实例源码
def is_valid_token(token, token_expiry):
"""
Validates if the supplied token is valid, and hasn't expired.
:param token: Token to check
:param token_expiry: When the token expires in seconds
:return: True if token is valid, and user_id contained in token
"""
entropy = current_app.secret_key if current_app.secret_key else 'un1testingmode'
serializer = URLSafeTimedSerializer(entropy)
try:
tokenised_user_id = serializer.loads(token, max_age=token_expiry)
except SignatureExpired:
current_app.logger.debug('Token has expired')
return False, None
except BadSignature:
current_app.logger.debug('Bad Token Signature')
return False, None
return True, tokenised_user_id
def generate_csrf(secret_key=None, token_key=None):
"""Generate a CSRF token. The token is cached for a request, so multiple
calls to this function will generate the same token.
During testing, it might be useful to access the signed token in
``g.csrf_token`` and the raw token in ``session['csrf_token']``.
:param secret_key: Used to securely sign the token. Default is
``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
:param token_key: Key where token is stored in session for comparision.
Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.
"""
secret_key = _get_config(
secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
message='A secret key is required to use CSRF.'
)
field_name = _get_config(
token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
message='A field name is required to use CSRF.'
)
if field_name not in g:
if field_name not in session:
session[field_name] = hashlib.sha1(os.urandom(64)).hexdigest()
s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')
setattr(g, field_name, s.dumps(session[field_name]))
return g.get(field_name)
def generate_csrf_token(self, csrf_token_field):
return generate_csrf(
secret_key=self.meta.csrf_secret,
token_key=self.meta.csrf_field_name
)
def csrf_secret(self):
return current_app.config.get(
'WTF_CSRF_SECRET_KEY', current_app.secret_key
)
def generate_csrf(secret_key=None, token_key=None):
"""Generate a CSRF token. The token is cached for a request, so multiple
calls to this function will generate the same token.
During testing, it might be useful to access the signed token in
``g.csrf_token`` and the raw token in ``session['csrf_token']``.
:param secret_key: Used to securely sign the token. Default is
``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
:param token_key: Key where token is stored in session for comparision.
Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.
"""
secret_key = _get_config(
secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
message='A secret key is required to use CSRF.'
)
field_name = _get_config(
token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
message='A field name is required to use CSRF.'
)
if field_name not in g:
if field_name not in session:
session[field_name] = hashlib.sha1(os.urandom(64)).hexdigest()
s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')
setattr(g, field_name, s.dumps(session[field_name]))
return g.get(field_name)
def generate_csrf_token(self, csrf_token_field):
return generate_csrf(
secret_key=self.meta.csrf_secret,
token_key=self.meta.csrf_field_name
)
def csrf_secret(self):
return current_app.config.get(
'WTF_CSRF_SECRET_KEY', current_app.secret_key
)
def validate_csrf(data, secret_key=None, time_limit=None):
"""Check if the given data is a valid csrf token.
:param data: The csrf token value to be checked.
:param secret_key: A secret key for mixing in the token,
default is Flask.secret_key.
:param time_limit: Check if the csrf token is expired.
default is True.
"""
if not data or '##' not in data:
return False
expires, hmac_csrf = data.split('##', 1)
try:
expires = float(expires)
except:
return False
if time_limit is None:
time_limit = current_app.config.get('WTF_CSRF_TIME_LIMIT', 3600)
if time_limit:
now = time.time()
if now > expires:
return False
if not secret_key:
secret_key = current_app.config.get(
'WTF_CSRF_SECRET_KEY', current_app.secret_key
)
if 'csrf_token' not in session:
return False
csrf_build = '%s%s' % (session['csrf_token'], expires)
hmac_compare = hmac.new(
to_bytes(secret_key),
to_bytes(csrf_build),
digestmod=hashlib.sha1
).hexdigest()
return hmac_compare == hmac_csrf
def get_serializer(secret_key=None):
if secret_key is None:
secret_key = current_app.secret_key
return URLSafeSerializer(secret_key)
def generate_csrf(secret_key=None, time_limit=None):
"""Generate csrf token code.
:param secret_key: A secret key for mixing in the token,
default is Flask.secret_key.
:param time_limit: Token valid in the time limit,
default is 3600s.
"""
if not secret_key:
secret_key = current_app.config.get(
'WTF_CSRF_SECRET_KEY', current_app.secret_key
)
if not secret_key:
raise Exception('Must provide secret_key to use csrf.')
if time_limit is None:
time_limit = current_app.config.get('WTF_CSRF_TIME_LIMIT', 3600)
if 'csrf_token' not in session:
session['csrf_token'] = hashlib.sha1(os.urandom(64)).hexdigest()
if time_limit:
expires = time.time() + time_limit
csrf_build = '%s%s' % (session['csrf_token'], expires)
else:
expires = ''
csrf_build = session['csrf_token']
hmac_csrf = hmac.new(
to_bytes(secret_key),
to_bytes(csrf_build),
digestmod=hashlib.sha1
).hexdigest()
return '%s##%s' % (expires, hmac_csrf)
def generate_csrf(secret_key=None, time_limit=None):
"""Generate csrf token code.
:param secret_key: A secret key for mixing in the token,
default is Flask.secret_key.
:param time_limit: Token valid in the time limit,
default is 3600s.
"""
if not secret_key:
secret_key = current_app.config.get(
'WTF_CSRF_SECRET_KEY', current_app.secret_key
)
if not secret_key:
raise Exception('Must provide secret_key to use csrf.')
if time_limit is None:
time_limit = current_app.config.get('WTF_CSRF_TIME_LIMIT', 3600)
if 'csrf_token' not in session:
session['csrf_token'] = hashlib.sha1(os.urandom(64)).hexdigest()
if time_limit:
expires = int(time.time() + time_limit)
csrf_build = '%s%s' % (session['csrf_token'], expires)
else:
expires = ''
csrf_build = session['csrf_token']
hmac_csrf = hmac.new(
to_bytes(secret_key),
to_bytes(csrf_build),
digestmod=hashlib.sha1
).hexdigest()
return '%s##%s' % (expires, hmac_csrf)
def generate_csrf(secret_key=None, time_limit=None):
"""Generate csrf token code.
:param secret_key: A secret key for mixing in the token,
default is Flask.secret_key.
:param time_limit: Token valid in the time limit,
default is 3600s.
"""
if not secret_key:
secret_key = current_app.config.get(
'WTF_CSRF_SECRET_KEY', current_app.secret_key
)
if not secret_key:
raise Exception('Must provide secret_key to use csrf.')
if time_limit is None:
time_limit = current_app.config.get('WTF_CSRF_TIME_LIMIT', 3600)
if 'csrf_token' not in session:
session['csrf_token'] = hashlib.sha1(os.urandom(64)).hexdigest()
if time_limit:
expires = time.time() + time_limit
csrf_build = '%s%s' % (session['csrf_token'], expires)
else:
expires = ''
csrf_build = session['csrf_token']
hmac_csrf = hmac.new(
to_bytes(secret_key),
to_bytes(csrf_build),
digestmod=hashlib.sha1
).hexdigest()
return '%s##%s' % (expires, hmac_csrf)
def validate_csrf(data, secret_key=None, time_limit=None):
"""Check if the given data is a valid csrf token.
:param data: The csrf token value to be checked.
:param secret_key: A secret key for mixing in the token,
default is Flask.secret_key.
:param time_limit: Check if the csrf token is expired.
default is True.
"""
if not data or '##' not in data:
return False
expires, hmac_csrf = data.split('##', 1)
try:
expires = float(expires)
except:
return False
if time_limit is None:
time_limit = current_app.config.get('WTF_CSRF_TIME_LIMIT', 3600)
if time_limit:
now = time.time()
if now > expires:
return False
if not secret_key:
secret_key = current_app.config.get(
'WTF_CSRF_SECRET_KEY', current_app.secret_key
)
if 'csrf_token' not in session:
return False
csrf_build = '%s%s' % (session['csrf_token'], expires)
hmac_compare = hmac.new(
to_bytes(secret_key),
to_bytes(csrf_build),
digestmod=hashlib.sha1
).hexdigest()
return hmac_compare == hmac_csrf
def create_jwt(payload):
"""
Create a signed JSON Web Token
"""
serializer = JSONWebSignatureSerializer(current_app.secret_key, algorithm_name='HS256')
return serializer.dumps(payload)
def load_jwt(token):
"""
Verify and load a signed JSON Web Token
"""
serializer = JSONWebSignatureSerializer(current_app.secret_key, algorithm_name='HS256')
return serializer.loads(token)
def generate_csrf(secret_key=None, token_key=None):
"""Generate a CSRF token. The token is cached for a request, so multiple
calls to this function will generate the same token.
During testing, it might be useful to access the signed token in
``g.csrf_token`` and the raw token in ``session['csrf_token']``.
:param secret_key: Used to securely sign the token. Default is
``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
:param token_key: Key where token is stored in session for comparision.
Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.
"""
secret_key = _get_config(
secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
message='A secret key is required to use CSRF.'
)
field_name = _get_config(
token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
message='A field name is required to use CSRF.'
)
if field_name not in g:
if field_name not in session:
session[field_name] = hashlib.sha1(os.urandom(64)).hexdigest()
s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')
setattr(g, field_name, s.dumps(session[field_name]))
return g.get(field_name)
def generate_csrf_token(self, csrf_token_field):
return generate_csrf(
secret_key=self.meta.csrf_secret,
token_key=self.meta.csrf_field_name
)
def csrf_secret(self):
return current_app.config.get(
'WTF_CSRF_SECRET_KEY', current_app.secret_key
)
def generate_csrf(secret_key=None, time_limit=None, token_key=None):
"""Generate csrf token code.
:param secret_key: A secret key for mixing in the token,
default is Flask.secret_key.
:param time_limit: Token valid in the time limit,
default is 3600s.
"""
if not secret_key:
secret_key = current_app.config.get(
'WTF_CSRF_SECRET_KEY', current_app.secret_key
)
if not secret_key:
raise Exception('Must provide secret_key to use csrf.')
if time_limit is None:
time_limit = current_app.config.get('WTF_CSRF_TIME_LIMIT', 3600)
if time_limit:
expires = int(time.time() + time_limit)
csrf_build = '%s:%s' % (expires, random.getrandbits(32))
else:
expires = ''
csrf_build = '%s:%s' % (expires, random.getrandbits(32))
hmac_csrf = hmac.new(
to_bytes(secret_key),
to_bytes(session.sid + csrf_build),
digestmod=hashlib.sha1
).hexdigest()
return '%s##%s' % (csrf_build, hmac_csrf)
def generate_csrf(secret_key=None, token_key=None):
"""Generate a CSRF token. The token is cached for a request, so multiple
calls to this function will generate the same token.
During testing, it might be useful to access the signed token in
``g.csrf_token`` and the raw token in ``session['csrf_token']``.
:param secret_key: Used to securely sign the token. Default is
``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
:param token_key: Key where token is stored in session for comparision.
Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.
"""
secret_key = _get_config(
secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
message='A secret key is required to use CSRF.'
)
field_name = _get_config(
token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
message='A field name is required to use CSRF.'
)
if field_name not in g:
if field_name not in session:
session[field_name] = hashlib.sha1(os.urandom(64)).hexdigest()
s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')
setattr(g, field_name, s.dumps(session[field_name]))
return g.get(field_name)
def generate_csrf_token(self, csrf_token_field):
return generate_csrf(
secret_key=self.meta.csrf_secret,
token_key=self.meta.csrf_field_name
)
def csrf_secret(self):
return current_app.config.get(
'WTF_CSRF_SECRET_KEY', current_app.secret_key
)
def generate_csrf(secret_key=None, time_limit=None):
"""Generate csrf token code.
:param secret_key: A secret key for mixing in the token,
default is Flask.secret_key.
:param time_limit: Token valid in the time limit,
default is 3600s.
"""
if not secret_key:
secret_key = current_app.config.get(
'WTF_CSRF_SECRET_KEY', current_app.secret_key
)
if not secret_key:
raise Exception('Must provide secret_key to use csrf.')
if time_limit is None:
time_limit = current_app.config.get('WTF_CSRF_TIME_LIMIT', 3600)
if 'csrf_token' not in session:
session['csrf_token'] = hashlib.sha1(os.urandom(64)).hexdigest()
if time_limit:
expires = int(time.time() + time_limit)
csrf_build = '%s%s' % (session['csrf_token'], expires)
else:
expires = ''
csrf_build = session['csrf_token']
hmac_csrf = hmac.new(
to_bytes(secret_key),
to_bytes(csrf_build),
digestmod=hashlib.sha1
).hexdigest()
return '%s##%s' % (expires, hmac_csrf)
def generate_csrf(secret_key=None, time_limit=None):
"""Generate csrf token code.
:param secret_key: A secret key for mixing in the token,
default is Flask.secret_key.
:param time_limit: Token valid in the time limit,
default is 3600s.
"""
if not secret_key:
secret_key = current_app.config.get(
'WTF_CSRF_SECRET_KEY', current_app.secret_key
)
if not secret_key:
raise Exception('Must provide secret_key to use csrf.')
if time_limit is None:
time_limit = current_app.config.get('WTF_CSRF_TIME_LIMIT', 3600)
if 'csrf_token' not in session:
session['csrf_token'] = hashlib.sha1(os.urandom(64)).hexdigest()
if time_limit:
expires = int(time.time() + time_limit)
csrf_build = '%s%s' % (session['csrf_token'], expires)
else:
expires = ''
csrf_build = session['csrf_token']
hmac_csrf = hmac.new(
to_bytes(secret_key),
to_bytes(csrf_build),
digestmod=hashlib.sha1
).hexdigest()
return '%s##%s' % (expires, hmac_csrf)
def generate_csrf(secret_key=None, time_limit=None):
"""Generate csrf token code.
:param secret_key: A secret key for mixing in the token,
default is Flask.secret_key.
:param time_limit: Token valid in the time limit,
default is 3600s.
"""
if not secret_key:
secret_key = current_app.config.get(
'WTF_CSRF_SECRET_KEY', current_app.secret_key
)
if not secret_key:
raise Exception('Must provide secret_key to use csrf.')
if time_limit is None:
time_limit = current_app.config.get('WTF_CSRF_TIME_LIMIT', 3600)
if 'csrf_token' not in session:
session['csrf_token'] = hashlib.sha1(os.urandom(64)).hexdigest()
if time_limit:
expires = int(time.time() + time_limit)
csrf_build = '%s%s' % (session['csrf_token'], expires)
else:
expires = ''
csrf_build = session['csrf_token']
hmac_csrf = hmac.new(
to_bytes(secret_key),
to_bytes(csrf_build),
digestmod=hashlib.sha1
).hexdigest()
return '%s##%s' % (expires, hmac_csrf)
def generate_token(tenant: Tenant) -> str:
s = JSONWebSignatureSerializer(current_app.secret_key, salt='auth')
payload = {
'repo_ids': [str(o) for o in tenant.repository_ids],
}
if getattr(tenant, 'user_id', None):
payload['uid'] = str(tenant.user_id)
return s.dumps(payload)
def parse_token(token: str) -> str:
s = JSONWebSignatureSerializer(current_app.secret_key, salt='auth')
try:
return s.loads(token)
except BadSignature:
return None
def generate_csrf(secret_key=None, token_key=None):
"""Generate a CSRF token. The token is cached for a request, so multiple
calls to this function will generate the same token.
During testing, it might be useful to access the signed token in
``g.csrf_token`` and the raw token in ``session['csrf_token']``.
:param secret_key: Used to securely sign the token. Default is
``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
:param token_key: Key where token is stored in session for comparision.
Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.
"""
secret_key = _get_config(
secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
message='A secret key is required to use CSRF.'
)
field_name = _get_config(
token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
message='A field name is required to use CSRF.'
)
if field_name not in g:
if field_name not in session:
session[field_name] = hashlib.sha1(os.urandom(64)).hexdigest()
s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')
setattr(g, field_name, s.dumps(session[field_name]))
return g.get(field_name)
def generate_csrf_token(self, csrf_token_field):
return generate_csrf(
secret_key=self.meta.csrf_secret,
token_key=self.meta.csrf_field_name
)