def decrypt_pk(priv_key, ciphertext):
"""
Decrypt a b64encoded ciphertext string with the RSA private key priv_key,
using CryptoHash() as the OAEP/MGF1 padding hash.
Returns the plaintext.
Decryption failures result in an exception being raised.
"""
try:
plaintext = priv_key.decrypt(
b64decode(ciphertext),
padding.OAEP(
mgf=padding.MGF1(algorithm=CryptoHash()),
algorithm=CryptoHash(),
label=None
)
)
except UnsupportedAlgorithm as e:
# a failure to dencrypt someone else's data is not typically a fatal
# error, but in this particular case, the most likely cause of this
# error is an old cryptography library
logging.error("Fatal error: encryption hash {} unsupported, try upgrading to cryptography >= 1.4. Exception: {}".format(
CryptoHash, e))
# re-raise the exception for the caller to handle
raise e
return plaintext
python类MGF1的实例源码
def test_verify_signature_PSS(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
signer = TEST_RSA_PRIVATE_KEY.signer(
padding.PSS(
mgf=padding.MGF1(hash_alg),
salt_length=padding.PSS.MAX_LENGTH
),
hash_alg
)
signer.update(data)
signature = base64.b64encode(signer.finalize())
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
verifier = signature_utils.get_verifier(None, img_sig_cert_uuid,
hash_name, signature,
signature_utils.RSA_PSS)
verifier.update(data)
verifier.verify()
def create_verifier_for_pss(signature, hash_method, public_key):
"""Create the verifier to use when the key type is RSA-PSS.
:param signature: the decoded signature to use
:param hash_method: the hash method to use, as a cryptography object
:param public_key: the public key to use, as a cryptography object
:raises: SignatureVerificationError if the RSA-PSS specific properties
are invalid
:returns: the verifier to use to verify the signature for RSA-PSS
"""
# default to MGF1
mgf = padding.MGF1(hash_method)
# default to max salt length
salt_length = padding.PSS.MAX_LENGTH
# return the verifier
return public_key.verifier(
signature,
padding.PSS(mgf=mgf, salt_length=salt_length),
hash_method
)
def signing(cls, message, private_key, algorithm='sha1'):
"""signing."""
if not isinstance(message, bytes):
message = message.encode()
algorithm = cls.ALGORITHM_DICT.get(algorithm)
signer = private_key.signer(
padding.PSS(
mgf=padding.MGF1(algorithm),
salt_length=padding.PSS.MAX_LENGTH
),
algorithm
)
signer.update(message)
return signer.finalize()
def verification(cls, message, signature, public_key, algorithm='sha1'):
"""Verification."""
if not isinstance(message, bytes):
message = message.encode()
algorithm = cls.ALGORITHM_DICT.get(algorithm)
verifier = public_key().verifier(
signature,
padding.PSS(
mgf=padding.MGF1(algorithm),
salt_length=padding.PSS.MAX_LENGTH
),
algorithm
)
verifier.update(message)
return verifier.verify()
def encrypt(cls, message, public_key, algorithm='sha1'):
"""Public key encrypt."""
if not isinstance(message, bytes):
message = message.encode()
algorithm = cls.ALGORITHM_DICT.get(algorithm)
ciphertext = public_key.encrypt(
message,
padding.OAEP(
mgf=padding.MGF1(algorithm=algorithm),
algorithm=algorithm,
label=None
)
)
return ciphertext
def create_decryptor(private_location, public_location):
try:
with open(private_location, "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None,
backend=default_backend()
)
except FileNotFoundError:
with open(private_location, "wb") as key_file:
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
key_file.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
with open(public_location, "wb") as public_file:
public_key = private_key.public_key()
pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
public_file.write(pem)
def decrypt(ciphertext):
return private_key.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None
)
)
return decrypt
def verify(self, msg, key, sig):
verifier = key.verifier(
sig,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
verifier.update(msg)
try:
verifier.verify()
return True
except InvalidSignature:
return False
def encrypt_key_with_public_key(secret_key, public_encryption_key):
"""
Encrypts the given secret key with the public key.
:param bytes secret_key: the key to encrypt
:param public_encryption_key: the public encryption key
:type public_encryption_key: :class:`~rsa.RSAPublicKey`
:return: the encrypted key
:rtype: bytes
"""
return public_encryption_key.encrypt(
secret_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None))
def decrypt_with_private_key(secret_key, private_encryption_key):
"""
Decrypts the given secret key with the private key.
:param bytes secret_key: the secret key to decrypt
:param private_encryption_key: the private encryption key
:type private_encryption_key: :class:`~rsa.RSAPrivateKey`
:return: the decrypted key
:rtype: bytes
"""
return private_encryption_key.decrypt(
secret_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None))
def rsa(public_key, signature, message):
"""Verifies an RSA signature.
Args:
public_key (str): Public key with BEGIN and END sections.
signature (str): Hex value of the signature with its leading 0x stripped.
message (str): Message that was signed, unhashed.
"""
try:
public_rsa = load_pem_public_key(bytes(public_key), backend=default_backend())
hashed = util.sha256(message)
public_rsa.verify(
binascii.unhexlify(signature),
hashed,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
except InvalidSignature:
raise Exception('Invalid signature')
def rsa_sign(private_key, message):
"""Signs a message with the provided Rsa private key.
Args:
private_key (str): Rsa private key with BEGIN and END sections.
message (str): Message to be hashed and signed.
Returns:
str: Hex signature of the message, with its leading 0x stripped.
"""
private_rsa = load_pem_private_key(bytes(private_key), password=None, backend=default_backend())
hashed = sha256(message)
signature = private_rsa.sign(
hashed,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return binascii.hexlify(bytearray(signature))
def verify(self, msg, key, sig):
verifier = key.verifier(
sig,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
verifier.update(msg)
try:
verifier.verify()
return True
except InvalidSignature:
return False
test_f_aws_encryption_sdk_client.py 文件源码
项目:aws-encryption-sdk-python
作者: awslabs
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def _mgf1_sha256_supported():
wk = serialization.load_pem_private_key(
data=VALUES['raw'][b'asym1'][EncryptionKeyType.PRIVATE],
password=None,
backend=default_backend()
)
try:
wk.public_key().encrypt(
plaintext=b'aosdjfoiajfoiaj;foijae;rogijaerg',
padding=padding.OAEP(
mgf=padding.MGF1(hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
except cryptography.exceptions.UnsupportedAlgorithm:
return False
return True
def _sign_image(self, image_file):
LOG.debug("Creating signature for image data")
signer = self.private_key.signer(
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
chunk_bytes = 8192
with open(image_file, 'rb') as f:
chunk = f.read(chunk_bytes)
while len(chunk) > 0:
signer.update(chunk)
chunk = f.read(chunk_bytes)
signature = signer.finalize()
signature_b64 = base64.b64encode(signature)
return signature_b64
def _rsa_verify_sig(sig_value, formatted, public_key_jsonld):
"""
- sig_value: data to be verified
- public_key: creator of this document's public_key
- tbv: to be verified
"""
# TODO: Support other formats than just PEM
public_key = serialization.load_pem_public_key(
_get_value(public_key_jsonld, "publicKeyPem").encode("utf-8"),
backend=default_backend())
try:
public_key.verify(
base64.b64decode(sig_value.encode("utf-8")), formatted,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256())
return True
except InvalidSignature:
return False
# In the future, we'll be doing a lot more work based on what suite is
# selected.
test_signature_utils.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_verify_signature_PSS(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
signer = TEST_RSA_PRIVATE_KEY.signer(
padding.PSS(
mgf=padding.MGF1(hash_alg),
salt_length=padding.PSS.MAX_LENGTH
),
hash_alg
)
signer.update(data)
signature = base64.b64encode(signer.finalize())
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
verifier = signature_utils.get_verifier(None, img_sig_cert_uuid,
hash_name, signature,
signature_utils.RSA_PSS)
verifier.update(data)
verifier.verify()
signature_utils.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def create_verifier_for_pss(signature, hash_method, public_key):
"""Create the verifier to use when the key type is RSA-PSS.
:param signature: the decoded signature to use
:param hash_method: the hash method to use, as a cryptography object
:param public_key: the public key to use, as a cryptography object
:raises: SignatureVerificationError if the RSA-PSS specific properties
are invalid
:returns: the verifier to use to verify the signature for RSA-PSS
"""
# default to MGF1
mgf = padding.MGF1(hash_method)
# default to max salt length
salt_length = padding.PSS.MAX_LENGTH
# return the verifier
return public_key.verifier(
signature,
padding.PSS(mgf=mgf, salt_length=salt_length),
hash_method
)
def encrypt_pk(pub_key, plaintext):
"""
Encrypt plaintext with the RSA public key pub_key, using CryptoHash()
as the OAEP/MGF1 padding hash.
plaintext is limited to the size of the RSA key, minus the padding, or a
few hundred bytes.
Returns a b64encoded ciphertext string.
Encryption failures result in an exception being raised.
"""
try:
ciphertext = pub_key.encrypt(
plaintext,
padding.OAEP(
mgf=padding.MGF1(algorithm=CryptoHash()),
algorithm=CryptoHash(),
label=None
)
)
except UnsupportedAlgorithm as e:
# a failure to encrypt our own data is a fatal error
# the most likely cause of this error is an old cryptography library
# although some newer binary cryptography libraries are linked with
# old OpenSSL versions, to fix, check 'openssl version' >= 1.0.2, then:
# pip install -I --no-binary cryptography cryptography
logging.error("Fatal error: encryption hash {} unsupported, try upgrading to cryptography >= 1.4 compiled with OpenSSL >= 1.0.2. Exception: {}".format(
CryptoHash, e))
# re-raise the exception for the caller to handle
raise e
return b64encode(ciphertext)
def encrypt_rsa(text, key):
private_key = load_key(key)
public_key = private_key.public_key()
return public_key.encrypt(
text,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None
)
)
def decrypt_rsa(text, key):
private_key = load_key(key)
return private_key.decrypt(
text,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None
)
)
def __init__(self, key_pair):
self.key_pair = key_pair
self._padding = asym_padding.OAEP(
mgf=asym_padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None
)
def decrypt(cls, ciphertext, private_key, algorithm='sha1'):
"""Private key descrption."""
algorithm = cls.ALGORITHM_DICT.get(algorithm)
plaintext = private_key.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=algorithm),
algorithm=algorithm,
label=None
)
)
return plaintext.decode()
def verify(self, signature: bytes, message: bytes):
return self._hazmat_public_key.verify(
signature,
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
def encrypt(self, message: bytes):
symkey = generate_sym_key()
ciphertext = symkey.encrypt(message)
ciphersymkey = self._hazmat_public_key.encrypt(
symkey.key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return struct.pack(">I", len(ciphersymkey)) + ciphersymkey + ciphertext
def sign(self, message: bytes):
return self._hazmat_private_key.sign(
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
def decrypt(self, ciphertext: bytes):
lenciphersymkey, = struct.unpack(">I", ciphertext[:4])
ciphersymkey = ciphertext[4:4 + lenciphersymkey]
ciphertext = ciphertext[4 + lenciphersymkey:]
symkey = load_sym_key(self._hazmat_private_key.decrypt(
ciphersymkey,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
))
return symkey.decrypt(ciphertext)
def _generate_sign(self, pri_key, data):
"""
?? ??? ??
:param pri_key: ??? ???
:param data: ?? ?? ???
:return: ??? ?? ???
"""
_signature = None
# ???? Type(RSA, ECC)? ?? ?? ?? ??
if isinstance(pri_key, ec.EllipticCurvePrivateKeyWithSerialization):
# ECDSA ??
logging.debug("Sign ECDSA")
signer = pri_key.signer(ec.ECDSA(hashes.SHA256()))
signer.update(data)
_signature = signer.finalize()
elif isinstance(pri_key, rsa.RSAPrivateKeyWithSerialization):
# RSA ??
logging.debug("Sign RSA")
_signature = pri_key.sign(
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
else:
logging.debug("Unknown PrivateKey Type : %s", type(pri_key))
return _signature
def __init__(self, name, hash_):
super(_JWAPS, self).__init__(name)
self.padding = padding.PSS(
mgf=padding.MGF1(hash_()),
salt_length=padding.PSS.MAX_LENGTH)
self.hash = hash_()
def sign(self, msg, key):
signer = key.signer(
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
signer.update(msg)
return signer.finalize()