def decrypt_pk(priv_key, ciphertext):
"""
Decrypt a b64encoded ciphertext string with the RSA private key priv_key,
using CryptoHash() as the OAEP/MGF1 padding hash.
Returns the plaintext.
Decryption failures result in an exception being raised.
"""
try:
plaintext = priv_key.decrypt(
b64decode(ciphertext),
padding.OAEP(
mgf=padding.MGF1(algorithm=CryptoHash()),
algorithm=CryptoHash(),
label=None
)
)
except UnsupportedAlgorithm as e:
# a failure to dencrypt someone else's data is not typically a fatal
# error, but in this particular case, the most likely cause of this
# error is an old cryptography library
logging.error("Fatal error: encryption hash {} unsupported, try upgrading to cryptography >= 1.4. Exception: {}".format(
CryptoHash, e))
# re-raise the exception for the caller to handle
raise e
return plaintext
python类UnsupportedAlgorithm()的实例源码
def _elliptic_curve_to_nid(self, curve):
"""
Get the NID for a curve name.
"""
curve_aliases = {
"secp192r1": "prime192v1",
"secp256r1": "prime256v1"
}
curve_name = curve_aliases.get(curve.name, curve.name)
curve_nid = self._lib.OBJ_sn2nid(curve_name.encode())
if curve_nid == self._lib.NID_undef:
raise UnsupportedAlgorithm(
"{0} is not a supported elliptic curve".format(curve.name),
_Reasons.UNSUPPORTED_ELLIPTIC_CURVE
)
return curve_nid
def __init__(self, backend, algorithm, ctx=None):
self._algorithm = algorithm
self._backend = backend
if ctx is None:
ctx = self._backend._lib.Cryptography_EVP_MD_CTX_new()
ctx = self._backend._ffi.gc(
ctx, self._backend._lib.Cryptography_EVP_MD_CTX_free
)
evp_md = self._backend._lib.EVP_get_digestbyname(
algorithm.name.encode("ascii"))
if evp_md == self._backend._ffi.NULL:
raise UnsupportedAlgorithm(
"{0} is not a supported hash on this backend.".format(
algorithm.name),
_Reasons.UNSUPPORTED_HASH
)
res = self._backend._lib.EVP_DigestInit_ex(ctx, evp_md,
self._backend._ffi.NULL)
self._backend.openssl_assert(res != 0)
self._ctx = ctx
def __init__(self, backend, algorithm, ctx=None):
self._algorithm = algorithm
self._backend = backend
if ctx is None:
ctx = self._backend._lib.Cryptography_EVP_MD_CTX_new()
ctx = self._backend._ffi.gc(
ctx, self._backend._lib.Cryptography_EVP_MD_CTX_free
)
evp_md = self._backend._lib.EVP_get_digestbyname(
algorithm.name.encode("ascii"))
if evp_md == self._backend._ffi.NULL:
raise UnsupportedAlgorithm(
"{0} is not a supported hash on this backend.".format(
algorithm.name),
_Reasons.UNSUPPORTED_HASH
)
res = self._backend._lib.EVP_DigestInit_ex(ctx, evp_md,
self._backend._ffi.NULL)
self._backend.openssl_assert(res != 0)
self._ctx = ctx
def __init__(self, backend, algorithm, ctx=None):
self._algorithm = algorithm
self._backend = backend
if ctx is None:
try:
methods = self._backend._hash_mapping[self.algorithm.name]
except KeyError:
raise UnsupportedAlgorithm(
"{0} is not a supported hash on this backend.".format(
algorithm.name),
_Reasons.UNSUPPORTED_HASH
)
ctx = self._backend._ffi.new(methods.ctx)
res = methods.hash_init(ctx)
assert res == 1
self._ctx = ctx
def _elliptic_curve_to_nid(self, curve):
"""
Get the NID for a curve name.
"""
curve_aliases = {
"secp192r1": "prime192v1",
"secp256r1": "prime256v1"
}
curve_name = curve_aliases.get(curve.name, curve.name)
curve_nid = self._lib.OBJ_sn2nid(curve_name.encode())
if curve_nid == self._lib.NID_undef:
raise UnsupportedAlgorithm(
"{0} is not a supported elliptic curve".format(curve.name),
_Reasons.UNSUPPORTED_ELLIPTIC_CURVE
)
return curve_nid
def __init__(self, backend, algorithm, ctx=None):
self._algorithm = algorithm
self._backend = backend
if ctx is None:
ctx = self._backend._lib.Cryptography_EVP_MD_CTX_new()
ctx = self._backend._ffi.gc(
ctx, self._backend._lib.Cryptography_EVP_MD_CTX_free
)
name = self._backend._build_openssl_digest_name(algorithm)
evp_md = self._backend._lib.EVP_get_digestbyname(name)
if evp_md == self._backend._ffi.NULL:
raise UnsupportedAlgorithm(
"{0} is not a supported hash on this backend.".format(
name),
_Reasons.UNSUPPORTED_HASH
)
res = self._backend._lib.EVP_DigestInit_ex(ctx, evp_md,
self._backend._ffi.NULL)
self._backend.openssl_assert(res != 0)
self._ctx = ctx
def __init__(self, backend, algorithm, ctx=None):
self._algorithm = algorithm
self._backend = backend
if ctx is None:
try:
methods = self._backend._hash_mapping[self.algorithm.name]
except KeyError:
raise UnsupportedAlgorithm(
"{0} is not a supported hash on this backend.".format(
algorithm.name),
_Reasons.UNSUPPORTED_HASH
)
ctx = self._backend._ffi.new(methods.ctx)
res = methods.hash_init(ctx)
assert res == 1
self._ctx = ctx
def _elliptic_curve_to_nid(self, curve):
"""
Get the NID for a curve name.
"""
curve_aliases = {
"secp192r1": "prime192v1",
"secp256r1": "prime256v1"
}
curve_name = curve_aliases.get(curve.name, curve.name)
curve_nid = self._lib.OBJ_sn2nid(curve_name.encode())
if curve_nid == self._lib.NID_undef:
raise UnsupportedAlgorithm(
"{0} is not a supported elliptic curve".format(curve.name),
_Reasons.UNSUPPORTED_ELLIPTIC_CURVE
)
return curve_nid
def __init__(self, backend, algorithm, ctx=None):
self._algorithm = algorithm
self._backend = backend
if ctx is None:
ctx = self._backend._lib.Cryptography_EVP_MD_CTX_new()
ctx = self._backend._ffi.gc(
ctx, self._backend._lib.Cryptography_EVP_MD_CTX_free
)
evp_md = self._backend._lib.EVP_get_digestbyname(
algorithm.name.encode("ascii"))
if evp_md == self._backend._ffi.NULL:
raise UnsupportedAlgorithm(
"{0} is not a supported hash on this backend.".format(
algorithm.name),
_Reasons.UNSUPPORTED_HASH
)
res = self._backend._lib.EVP_DigestInit_ex(ctx, evp_md,
self._backend._ffi.NULL)
self._backend.openssl_assert(res != 0)
self._ctx = ctx
def __init__(self, backend, algorithm, ctx=None):
self._algorithm = algorithm
self._backend = backend
if ctx is None:
ctx = self._backend._lib.Cryptography_EVP_MD_CTX_new()
ctx = self._backend._ffi.gc(
ctx, self._backend._lib.Cryptography_EVP_MD_CTX_free
)
evp_md = self._backend._lib.EVP_get_digestbyname(
algorithm.name.encode("ascii"))
if evp_md == self._backend._ffi.NULL:
raise UnsupportedAlgorithm(
"{0} is not a supported hash on this backend.".format(
algorithm.name),
_Reasons.UNSUPPORTED_HASH
)
res = self._backend._lib.EVP_DigestInit_ex(ctx, evp_md,
self._backend._ffi.NULL)
self._backend.openssl_assert(res != 0)
self._ctx = ctx
def __init__(self, backend, algorithm, ctx=None):
self._algorithm = algorithm
self._backend = backend
if ctx is None:
try:
methods = self._backend._hash_mapping[self.algorithm.name]
except KeyError:
raise UnsupportedAlgorithm(
"{0} is not a supported hash on this backend.".format(
algorithm.name),
_Reasons.UNSUPPORTED_HASH
)
ctx = self._backend._ffi.new(methods.ctx)
res = methods.hash_init(ctx)
assert res == 1
self._ctx = ctx
def encrypt_pk(pub_key, plaintext):
"""
Encrypt plaintext with the RSA public key pub_key, using CryptoHash()
as the OAEP/MGF1 padding hash.
plaintext is limited to the size of the RSA key, minus the padding, or a
few hundred bytes.
Returns a b64encoded ciphertext string.
Encryption failures result in an exception being raised.
"""
try:
ciphertext = pub_key.encrypt(
plaintext,
padding.OAEP(
mgf=padding.MGF1(algorithm=CryptoHash()),
algorithm=CryptoHash(),
label=None
)
)
except UnsupportedAlgorithm as e:
# a failure to encrypt our own data is a fatal error
# the most likely cause of this error is an old cryptography library
# although some newer binary cryptography libraries are linked with
# old OpenSSL versions, to fix, check 'openssl version' >= 1.0.2, then:
# pip install -I --no-binary cryptography cryptography
logging.error("Fatal error: encryption hash {} unsupported, try upgrading to cryptography >= 1.4 compiled with OpenSSL >= 1.0.2. Exception: {}".format(
CryptoHash, e))
# re-raise the exception for the caller to handle
raise e
return b64encode(ciphertext)
def signature_hash_algorithm(self):
alg = self._backend._ffi.new("X509_ALGOR **")
self._backend._lib.X509_get0_signature(
self._backend._ffi.NULL, alg, self._x509
)
self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
oid = _obj2txt(self._backend, alg[0].algorithm)
try:
return x509._SIG_OIDS_TO_HASH[oid]
except KeyError:
raise UnsupportedAlgorithm(
"Signature algorithm OID:{0} not recognized".format(oid)
)
def signature_hash_algorithm(self):
alg = self._backend._ffi.new("X509_ALGOR **")
self._backend._lib.X509_CRL_get0_signature(
self._backend._ffi.NULL, alg, self._x509_crl
)
self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
oid = _obj2txt(self._backend, alg[0].algorithm)
try:
return x509._SIG_OIDS_TO_HASH[oid]
except KeyError:
raise UnsupportedAlgorithm(
"Signature algorithm OID:{0} not recognized".format(oid)
)
def signature_hash_algorithm(self):
alg = self._backend._ffi.new("X509_ALGOR **")
self._backend._lib.X509_REQ_get0_signature(
self._backend._ffi.NULL, alg, self._x509_req
)
self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL)
oid = _obj2txt(self._backend, alg[0].algorithm)
try:
return x509._SIG_OIDS_TO_HASH[oid]
except KeyError:
raise UnsupportedAlgorithm(
"Signature algorithm OID:{0} not recognized".format(oid)
)
def derive_pbkdf2_hmac(self, algorithm, length, salt, iterations,
key_material):
buf = self._ffi.new("char[]", length)
if self._lib.Cryptography_HAS_PBKDF2_HMAC:
evp_md = self._lib.EVP_get_digestbyname(
algorithm.name.encode("ascii"))
self.openssl_assert(evp_md != self._ffi.NULL)
res = self._lib.PKCS5_PBKDF2_HMAC(
key_material,
len(key_material),
salt,
len(salt),
iterations,
evp_md,
length,
buf
)
self.openssl_assert(res == 1)
else:
if not isinstance(algorithm, hashes.SHA1):
raise UnsupportedAlgorithm(
"This version of OpenSSL only supports PBKDF2HMAC with "
"SHA1.",
_Reasons.UNSUPPORTED_HASH
)
res = self._lib.PKCS5_PBKDF2_HMAC_SHA1(
key_material,
len(key_material),
salt,
len(salt),
iterations,
length,
buf
)
self.openssl_assert(res == 1)
return self._ffi.buffer(buf)[:]
def _evp_pkey_to_private_key(self, evp_pkey):
"""
Return the appropriate type of PrivateKey given an evp_pkey cdata
pointer.
"""
key_type = self._lib.Cryptography_EVP_PKEY_id(evp_pkey)
if key_type == self._lib.EVP_PKEY_RSA:
rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
self.openssl_assert(rsa_cdata != self._ffi.NULL)
rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
elif key_type == self._lib.EVP_PKEY_DSA:
dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
self.openssl_assert(dsa_cdata != self._ffi.NULL)
dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
elif (self._lib.Cryptography_HAS_EC == 1 and
key_type == self._lib.EVP_PKEY_EC):
ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
self.openssl_assert(ec_cdata != self._ffi.NULL)
ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
else:
raise UnsupportedAlgorithm("Unsupported key type.")
def elliptic_curve_supported(self, curve):
if self._lib.Cryptography_HAS_EC != 1:
return False
try:
curve_nid = self._elliptic_curve_to_nid(curve)
except UnsupportedAlgorithm:
curve_nid = self._lib.NID_undef
ctx = self._lib.EC_GROUP_new_by_curve_name(curve_nid)
if ctx == self._ffi.NULL:
errors = self._consume_errors()
self.openssl_assert(
curve_nid == self._lib.NID_undef or
errors[0][1:] == (
self._lib.ERR_LIB_EC,
self._lib.EC_F_EC_GROUP_NEW_BY_CURVE_NAME,
self._lib.EC_R_UNKNOWN_GROUP
)
)
return False
else:
self.openssl_assert(curve_nid != self._lib.NID_undef)
self._lib.EC_GROUP_free(ctx)
return True
def generate_elliptic_curve_private_key(self, curve):
"""
Generate a new private key on the named curve.
"""
if self.elliptic_curve_supported(curve):
curve_nid = self._elliptic_curve_to_nid(curve)
ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
self.openssl_assert(ec_cdata != self._ffi.NULL)
ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
res = self._lib.EC_KEY_generate_key(ec_cdata)
self.openssl_assert(res == 1)
res = self._lib.EC_KEY_check_key(ec_cdata)
self.openssl_assert(res == 1)
evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
else:
raise UnsupportedAlgorithm(
"Backend object does not support {0}.".format(curve.name),
_Reasons.UNSUPPORTED_ELLIPTIC_CURVE
)
def create_symmetric_encryption_ctx(self, cipher, mode):
for b in self._filtered_backends(CipherBackend):
try:
return b.create_symmetric_encryption_ctx(cipher, mode)
except UnsupportedAlgorithm:
pass
raise UnsupportedAlgorithm(
"cipher {0} in {1} mode is not supported by this backend.".format(
cipher.name, mode.name if mode else mode),
_Reasons.UNSUPPORTED_CIPHER
)
def create_symmetric_decryption_ctx(self, cipher, mode):
for b in self._filtered_backends(CipherBackend):
try:
return b.create_symmetric_decryption_ctx(cipher, mode)
except UnsupportedAlgorithm:
pass
raise UnsupportedAlgorithm(
"cipher {0} in {1} mode is not supported by this backend.".format(
cipher.name, mode.name if mode else mode),
_Reasons.UNSUPPORTED_CIPHER
)
def create_hash_ctx(self, algorithm):
for b in self._filtered_backends(HashBackend):
try:
return b.create_hash_ctx(algorithm)
except UnsupportedAlgorithm:
pass
raise UnsupportedAlgorithm(
"{0} is not a supported hash on this backend.".format(
algorithm.name),
_Reasons.UNSUPPORTED_HASH
)
def create_hmac_ctx(self, key, algorithm):
for b in self._filtered_backends(HMACBackend):
try:
return b.create_hmac_ctx(key, algorithm)
except UnsupportedAlgorithm:
pass
raise UnsupportedAlgorithm(
"{0} is not a supported hash on this backend.".format(
algorithm.name),
_Reasons.UNSUPPORTED_HASH
)
def derive_pbkdf2_hmac(self, algorithm, length, salt, iterations,
key_material):
for b in self._filtered_backends(PBKDF2HMACBackend):
try:
return b.derive_pbkdf2_hmac(
algorithm, length, salt, iterations, key_material
)
except UnsupportedAlgorithm:
pass
raise UnsupportedAlgorithm(
"{0} is not a supported hash on this backend.".format(
algorithm.name),
_Reasons.UNSUPPORTED_HASH
)
def generate_rsa_parameters_supported(self, public_exponent, key_size):
for b in self._filtered_backends(RSABackend):
return b.generate_rsa_parameters_supported(
public_exponent, key_size
)
raise UnsupportedAlgorithm("RSA is not supported by the backend.",
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM)
def rsa_padding_supported(self, padding):
for b in self._filtered_backends(RSABackend):
return b.rsa_padding_supported(padding)
raise UnsupportedAlgorithm("RSA is not supported by the backend.",
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM)
def load_rsa_private_numbers(self, numbers):
for b in self._filtered_backends(RSABackend):
return b.load_rsa_private_numbers(numbers)
raise UnsupportedAlgorithm("RSA is not supported by the backend",
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM)
def load_rsa_public_numbers(self, numbers):
for b in self._filtered_backends(RSABackend):
return b.load_rsa_public_numbers(numbers)
raise UnsupportedAlgorithm("RSA is not supported by the backend",
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM)
def generate_dsa_parameters(self, key_size):
for b in self._filtered_backends(DSABackend):
return b.generate_dsa_parameters(key_size)
raise UnsupportedAlgorithm("DSA is not supported by the backend.",
_Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM)