def decrypt_file(file, key):
"""
Decrypts the file ``file``.
The encrypted file is assumed to end with the ``.enc`` extension. The
decrypted file is saved to the same location without the ``.enc``
extension.
The permissions on the decrypted file are automatically set to 0o600.
See also :func:`doctr.local.encrypt_file`.
"""
if not file.endswith('.enc'):
raise ValueError("%s does not end with .enc" % file)
fer = Fernet(key)
with open(file, 'rb') as f:
decrypted_file = fer.decrypt(f.read())
with open(file[:-4], 'wb') as f:
f.write(decrypted_file)
os.chmod(file[:-4], 0o600)
python类Fernet()的实例源码
def __init__(self,
bot_user,
osu_client,
model_cache_dir,
model_cache_size,
token_secret,
upload_url):
super().__init__({bot_user})
self.bot_user = bot_user
self.osu_client = osu_client
self.model_cache_dir = pathlib.Path(model_cache_dir)
self.token_secret = Fernet(token_secret)
self.upload_url = upload_url
self.get_model = lru_cache(model_cache_size)(self._get_model)
self._user_stats = ExpiringCache()
self._candidates = LockedIterator(self._gen_candidates())
def save_secrets():
logger = logging.getLogger(__name__)
logger.info("Reading secrets from environment variables")
secrets = {}
missing_key = False
for secrets_key in secrets_keys:
try:
logger.info("Writing key: " + secrets_key)
secrets[secrets_key] = os.environ[secrets_key]
except KeyError:
logger.warning("Missing key:", secrets_key)
missing_key = True
if missing_key:
logger.critical("Aborting...")
return
logger.info("Writing secrets to secrets.json")
f = Fernet(key)
with open("config/secrets.dat", 'wb') as secrets_file:
secrets_file.write(f.encrypt(json.dumps(secrets).encode()))
def _encode_uuid_map(userid, uuid, passwd):
data = 'userid:%s:uuid:%s' % (userid, uuid)
# FIXME scrypt.encrypt is broken in windows.
# This is a quick hack. The hostname might not be unique enough though.
# We could use a long random hash per entry and store it in the file.
# Other option is to use a different KDF that is supported by cryptography
# (ie, pbkdf)
if IS_WIN:
key = scrypt.hash(passwd, socket.gethostname())
key = base64.urlsafe_b64encode(key[:32])
f = Fernet(key, backend=crypto_backend)
encrypted = f.encrypt(data)
else:
encrypted = scrypt.encrypt(data, passwd, maxtime=0.05)
return base64.urlsafe_b64encode(encrypted)
def _decode_uuid_line(line, passwd):
decoded = base64.urlsafe_b64decode(line)
if IS_WIN:
key = scrypt.hash(passwd, socket.gethostname())
key = base64.urlsafe_b64encode(key[:32])
try:
f = Fernet(key, backend=crypto_backend)
maybe_decrypted = f.decrypt(key)
except Exception:
return None
else:
try:
maybe_decrypted = scrypt.decrypt(decoded, passwd, maxtime=0.1)
except scrypt.error:
return None
match = re.findall("userid\:(.+)\:uuid\:(.+)", maybe_decrypted)
if match:
return match[0]
def get_fernet():
"""
Deferred load of Fernet key.
This function could fail either because Cryptography is not installed
or because the Fernet key is invalid.
:return: Fernet object
:raises: AirflowException if there's a problem trying to load Fernet
"""
try:
from cryptography.fernet import Fernet
except:
raise AirflowException('Failed to import Fernet, it may not be installed')
try:
return Fernet(configuration.get('core', 'FERNET_KEY').encode('utf-8'))
except ValueError as ve:
raise AirflowException("Could not create Fernet object: {}".format(ve))
def decrypt(data, password):
"""Decrypts data using the password.
Decrypts the data using the provided password using the cryptography module.
If the pasword or data is incorrect this will return None.
"""
password = bytes(password)
#Salt is equal to password as we want the encryption to be reversible only
#using the password itself
kdf = PBKDF2HMAC(algorithm=hashes.AES(),
length=32,
salt=bytes(password),
iterations=100000,
backend=default_backend())
key = base64.urlsafe_b64encode(kdf.derive(password))
f = Fernet(key)
token = f.decrypt(data)
return token
def encrypt(self, save_content=False):
if self.encryption_key is None:
raise Exception('Cannot encrypt content, missing encryption key.')
if self.config.get('content') is None:
raise Exception('Cannot encrypt content, content is empty.')
if self.is_encryptable == False:
raise Exception('Cannot encrypt, improper configuration.')
if self.config.get('is_encrypted') == True:
return self.config.get('content')
f = Fernet(self.encryption_key)
if self.config.get('is_binary') == True:
encr_content = f.encrypt(self.config.get('content'))
elif self.config.get('is_binary') == False:
encr_content = f.encrypt(self.config.get('content').encode('utf-8')).decode('utf-8')
else:
raise Exception('Could not tell if file is binary or text. Aborting.')
if save_content == True:
try:
self.config['content'] = encr_content
self.config['content_length'] = len(encr_content)
self.config['is_encrypted'] = True
except:
raise
return encr_content
def decrypt(self, save_content=False):
if self.encryption_key is None:
raise LocalConfigFileError('Cannot decrypt content, missing encryption key.')
if self.config.get('content') is None:
raise LocalConfigFileError('Cannot decrypt content, content is empty.')
if self.is_encryptable == False:
raise LocalConfigFileError('Cannot decrypt, improper configuration.')
if self.config.get('is_encrypted') == False:
return self.config.get('content')
f = Fernet(self.encryption_key)
if self.config.get('is_binary') == True:
decr_content = f.decrypt(self.config.get('content'))
elif self.config.get('is_binary') == False:
decr_content = f.decrypt(self.config.get('content').encode('utf-8')).decode('utf-8')
else:
raise LocalConfigFileError('Could not tell if file is binary or text. Aborting.')
if save_content == True:
try:
self.config['content'] = decr_content
self.config['content_length'] = len(decr_content)
self.config['is_encrypted'] = False
except:
raise
return decr_content
def test_get_Fernet_returns_a_key():
fake = Faker()
appname= '_'.join(['test_a_netcrawl_', fake.word(), fake.word()])
username= '_'.join(['test_u_netcrawl_', fake.word(), fake.word()])
key= manage._get_fernet_key(appname, username)
assert isinstance(key, fernet.Fernet), 'Key [{}] is wrong type'.format(
type(key))
assert keyring.get_password(appname, username) is not None
keyring.delete_password(appname, username)
assert keyring.get_password(appname, username) is None
#===============================================================================
# def test_check_credentials_no_error():
# config.cc.check_credentials()
#===============================================================================
def setup_up_config(func):
def wrapper(arguments):
_, ext = os.path.splitext(arguments.path)
file_data = b''
if os.path.exists(arguments.path):
with open(arguments.path, 'rb') as fp:
file_data = fp.read()
if file_data and arguments.fernet_key:
f = Fernet(arguments.fernet_key.encode('utf-8'))
file_data = f.decrypt(file_data)
with tempfile.NamedTemporaryFile(suffix=ext) as fp:
fp.write(file_data)
fp.file.flush()
arguments.origin_path = arguments.path
arguments.path = fp.name
return func(arguments)
return wrapper
def can_save_an_encrypted_file(self):
with tempfile.NamedTemporaryFile() as tp:
save_temporary_config(
'./spec/cli/sample.yml',
tp.name,
test_key
)
with open(tp.name, 'rb') as fp:
ct = fp.read()
f = Fernet(test_key.encode('utf-8'))
data = yaml.load(f.decrypt(ct))
expect(data['thing']).to.equal('bam')
expect(data['other']).to.equal('thing')
def get_credential(self, hostname):
"""
This function returns credentials for a particular hostname.
:param hostname: hostname
:type hostname: str
"""
hostname = self.cut_hostname(hostname)
try:
if self.KEY:
crypto = Fernet(self.KEY)
return (
self.CREDENTIALS[hostname]["username"],
crypto.decrypt(self.CREDENTIALS[hostname]["password"][2:].encode())
)
else:
#return plain information
return (
self.CREDENTIALS[hostname]["username"],
self.CREDENTIALS[hostname]["password"]
)
except InvalidToken:
raise ContainerException("Invalid password specified!")
except KeyError:
pass
def encrypt(data, secret):
"""
Basically wraps the cryptography lib to reduce code duplication
:param data:
:type data: bytes
:param secret:
:type secret: bytes
:return:
:rtype: bytes, str
"""
# Generate a cryptographically secure salt
r = Random.new()
salt = r.read(16)
# Generates a suitable key from the secret by HMACing it with the salt
kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend())
key = urlsafe_b64encode(kdf.derive(secret))
# Encrypts the data
f = Fernet(key)
ciphertext = f.encrypt(data)
return b64encode(salt + ciphertext)
def decrypt(encodedciphertext, secret):
"""
Basically wraps the cryptography lib to reduce code duplication
:param encodedciphertext:
:type encodedciphertext: bytes
:param secret:
:type secret: bytes
:return:
"""
# Split out the salt from the ciphertext
ciphertext = b64decode(encodedciphertext)
salt = ciphertext[:16]
ciphertext = ciphertext[16:]
# Regenerates the key by HMACing the secret with the salt
kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend())
key = urlsafe_b64encode(kdf.derive(secret))
# Decrypts the data
f = Fernet(key)
data = f.decrypt(ciphertext)
return data
def fernet_encrypt_psk(message, raw=False):
"""Encrypts the specified message using the Fernet format.
Returns the encrypted token, as a byte string.
Note that a Fernet token includes the current time. Users decrypting a
the token can specify a TTL (in seconds) indicating how long the encrypted
message should be valid. So the system clock must be correct before calling
this function.
:param message: The message to encrypt.
:type message: Must be of type 'bytes' or a UTF-8 'str'.
:param raw: if True, returns the decoded base64 bytes representing the
Fernet token. The bytes must be converted back to base64 to be
decrypted. (Or the 'raw' argument on the corresponding
fernet_decrypt_psk() function can be used.)
:return: the encryption token, as a base64-encoded byte string.
"""
fernet = _get_fernet_context()
if isinstance(message, str):
message = message.encode("utf-8")
token = fernet.encrypt(message)
if raw is True:
token = urlsafe_b64decode(token)
return token
def fernet_decrypt_psk(token, ttl=None, raw=False):
"""Decrypts the specified Fernet token using the MAAS secret.
Returns the decrypted token as a byte string; the user is responsible for
converting it to the correct format or encoding.
:param message: The token to decrypt.
:type token: Must be of type 'bytes', or an ASCII base64 string.
:param ttl: Optional amount of time (in seconds) allowed to have elapsed
before the message is rejected upon decryption. Note that the Fernet
library considers times up to 60 seconds into the future (beyond the
TTL) to be valid.
:param raw: if True, treats the string as the decoded base64 bytes of a
Fernet token, and attempts to encode them (as expected by the Fernet
APIs) before decrypting.
:return: bytes
"""
if raw is True:
token = urlsafe_b64encode(token)
f = _get_fernet_context()
if isinstance(token, str):
token = token.encode("ascii")
return f.decrypt(token, ttl=ttl)
def _get_decrypted_pairs(self, credential):
"""
From credential, get decrypted blind credential pairs.
Given a region => data_key dict of data keys, a region => context dict
of KMS encryption context, a dict of encrypted credential pairs, a
cipher and a cipher version, return decrypted credential_pairs.
"""
region = self.config['region']
_context = credential['metadata']['context'][region]
if self.aws_creds:
_kms_client = confidant_client.services.get_boto_client(
'kms',
region=self.config['region'],
aws_access_key_id=self.aws_creds['AccessKeyId'],
aws_secret_access_key=self.aws_creds['SecretAccessKey'],
aws_session_token=self.aws_creds['SessionToken']
)
else:
_kms_client = self.kms_client
_data_key = cryptolib.decrypt_datakey(
base64.b64decode(
ensure_bytes(credential['data_key'][region])
),
_context,
_kms_client
)
_credential_pair = credential['credential_pairs'][region]
f = Fernet(_data_key)
return json.loads(f.decrypt(_credential_pair.encode('utf-8')))
def encrypt_text(password, token):
f = Fernet(get_key(password))
return f.encrypt(bytes(token))
def decrypt_text(password, token):
f = Fernet(get_key(password))
return f.decrypt(bytes(token))
def _get_fernet(cls):
return Fernet(base64.urlsafe_b64encode(PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=SALT,
iterations=100000,
backend=default_backend()
).derive(PASSWORD)))
def gen_token(obj, user):
"""Generate a token for a user.
"""
from cryptography.fernet import Fernet
from .token import gen_token
print(gen_token(Fernet(obj.token_secret), user))
def _load_secrets():
secrets_path = _get_secrets_path()
if not path.isfile(secrets_path):
logging.getLogger(__name__).debug("No secrets file found")
return {}
f = Fernet(key)
with open(secrets_path, 'rb') as secrets_file:
try:
return json.loads(f.decrypt(secrets_file.read()).decode())
except InvalidToken:
logging.getLogger(__name__).critical("You entered the wrong password")
async_handler.shutdown()
return None
def save_secrets():
"""
Save secrets to encrypted file on disk.
"""
secrets_path = _get_secrets_path()
f = Fernet(key)
with open(secrets_path, 'wb') as secrets_file:
secrets_file.write(f.encrypt(json.dumps(_secrets).encode()))
def __init__(self):
self._salt = settings.SECRET_KEY
self._kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=self._salt,
iterations=100000,
backend=default_backend()
)
self._key = base64.urlsafe_b64encode(self._kdf.derive(settings.SECRET_KEY))
self._fernet = Fernet(self._key)
def encrypt(pwd, key):
""" """
salt = os.urandom(32)
kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend())
encrypted_key = base64.urlsafe_b64encode(kdf.derive(bytes(key.encode('utf-8'))))
f = Fernet(encrypted_key)
return f.encrypt(bytes(pwd.encode('utf-8'))).decode('latin1'), salt.decode('latin1')
def decrypt(cipher_txt, key, salt):
salt = bytes(salt.encode('latin1'))
cipher_txt = bytes(str(cipher_txt).encode('latin1'))
kdf = PBKDF2HMAC(algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend())
encrypted_key = base64.urlsafe_b64encode(kdf.derive(bytes(key.encode('utf-8'))))
f = Fernet(encrypted_key)
return f.decrypt(cipher_txt).decode('utf-8')
def encrypt(text):
fernet = Fernet(to_binary(current_app.config['SECURE_TOKEN_KEY']))
return fernet.encrypt(to_binary(text))
def decrypt(encrypted):
fernet = Fernet(to_binary(current_app.config['SECURE_TOKEN_KEY']))
text = fernet.decrypt(to_binary(encrypted))
return to_text(text)
def _fernet_from_password(password, salt):
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend()
)
key = base64.urlsafe_b64encode(kdf.derive(password))
return Fernet(key)