def signin(req):
if any(map(lambda key: key not in req.json, ["login", "password"])):
logger.debug(f"Request is {req.json} but some arguments are missing.")
raise InvalidUsage("Missing argument")
user = await User.get_by_login(req.json["login"])
if user is None:
logger.debug(f"Request is {req.json} but user coundn't be found.")
raise NotFound("User not found")
if await accounts.is_frozen(user.id, req.ip):
logger.debug(f"Request is {req.json} but the account is frozen.")
raise InvalidUsage("Account frozen")
if not compare_digest(user.password, User.hashpwd(req.json["password"])):
logger.debug(f"Request is {req.json} but the password is invalid.")
unfreeze = await accounts.freeze(user.id, req.ip)
raise InvalidUsage("Invalid password. Account frozen until " + unfreeze.isoformat(sep=" ", timespec="seconds"))
await accounts.unfreeze(user.id, req.ip)
token = await accounts.register(user.id)
logger.info(f"User {user.name} connected. Token generated: {token}")
return json({"token": token, "id": user.id, "name": user.name})
python类compare_digest()的实例源码
def check_user(user: User, password: str) -> bool:
hashpass, salt = user_info[user].hashed_password
target_hash_pass = hash_password(password, salt)[0]
sleep(random.expovariate(10))
return secrets.compare_digest(hashpass, target_hash_pass)
def _verify(self, payload, signature):
good_signature = self._sign(payload)
if not secrets.compare_digest(good_signature, signature):
raise SignatureError('invalid signature: got {}, want {}'.format(
signature, good_signature))
def is_valid_signature(self, signature):
return compare_digest(self.get_signature(), signature)
def authenticate(self, request):
auth_token = getattr(self.settings, self.auth_token_field)
if not secrets.compare_digest(auth_token, request.headers.get('Authorization', '')):
raise HTTPForbidden(text='Invalid Authorization header')
def authenticate(self, request):
company = request.query.get('company', None)
expires = request.query.get('expires', None)
body = f'{company}:{expires}'.encode()
expected_sig = hmac.new(self.settings.user_auth_key, body, hashlib.sha256).hexdigest()
signature = request.query.get('signature', '-')
if not secrets.compare_digest(expected_sig, signature):
raise HTTPForbidden(text='Invalid token')
self.session = Session(
company=company,
expires=expires,
)
if self.session.expires < datetime.utcnow().replace(tzinfo=timezone.utc):
raise HTTPForbidden(text='token expired')
def authenticate(self, request):
token = re.sub('^Basic *', '', request.headers.get('Authorization', '')) or 'x'
try:
_, password = base64.b64decode(token).decode().split(':', 1)
except (ValueError, UnicodeDecodeError):
password = ''
if not secrets.compare_digest(password, self.settings.admin_basic_auth_password):
raise HTTPUnauthorized(text='Invalid basic auth', headers={'WWW-Authenticate': 'Basic'})
def get_user_from_login_token(token):
"""Get a `User` from a login token.
A login token has this format:
<user uuid>:<auth token>
"""
user_id, auth_token = token.split(':')
user = db.session.query(User).get(user_id)
if user and user.current_auth_token:
if secrets.compare_digest(user.current_auth_token, auth_token):
return user
return None