def test_unsupported_oaep_label(self):
private_key = RSA_KEY_512.private_key(backend)
with pytest.raises(ValueError):
private_key.decrypt(
b"0" * 64,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=b"label"
)
)
python类MGF1的实例源码
def test_unsupported_mgf1_hash_algorithm(self):
private_key = RSA_KEY_512.private_key(backend)
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH):
private_key.decrypt(
b"0" * 64,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA1(),
label=None
)
)
def test_unsupported_oaep_hash_algorithm(self):
private_key = RSA_KEY_512.private_key(backend)
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH):
private_key.decrypt(
b"0" * 64,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA256(),
label=None
)
)
def decrypt(privkey, ciphertext):
plaintext = privkey.decrypt(
ciphertext,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return plaintext
def encrypt(pubkey, plaintext):
ciphertext= pubkey.encrypt(plaintext=plaintext,
padding=padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return ciphertext
def sign_message(privkey, message):
plaintext = b64decode(message.encode('utf-8'))
signature = privkey.sign(
plaintext,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return b64encode(signature).decode('utf-8')
def wrap_rsa_key(public_key, private_key_bytes):
# Use the Google public key to encrypt the customer private key.
# This means that only the Google private key is capable of decrypting
# the customer private key.
wrapped_key = public_key.encrypt(
private_key_bytes,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None))
encoded_wrapped_key = base64.b64encode(wrapped_key)
return encoded_wrapped_key
def __encrypt_with_rsa(self, content, recipient_pk):
""" Encrypt content with RSAES-OAEP scheme
@developer: vsmysle
This method handles an encryption of a *single* RSA block with a
specified above scheme. It does not handle splitting of a header into
several blocks. It has to be done by other method that would use this
one only for single block encryption purpose.
TODO: what is a maximum size of a content that can be padded and
encrypted given a particular size of RSA key?
:param content: bytes content to encrypt (probably a part of
ASN.1 DER-encoded MPHeader block)
:param recipient_pk: instance of cryptography.hazmat.primitives.rsa
.RSAPublicKey to use for a content encryption
:return: string encryption of an input content
"""
# TODO: add exceptions
self.logger.debug("rsa encryption")
ciphertext = recipient_pk.encrypt(
content, asym_padding.OAEP(
mgf=asym_padding.MGF1(algorithm=SHA1()),
algorithm=SHA1(),
label=None
)
)
self.logger.info("encrypted")
return ciphertext
def __decrypt_with_rsa(self, content, user_sk):
""" Decrypt RSAES-OAEP encrypted content (single block)
@developer: vsmysle
This method decrypts a single RSA ciphertext block only
:param content: bytes content to decrypt
:param user_sk: instance of cryptography.hazmat.primitives.rsa
.RSAPrivateKey to use for a decryption
:return: string decryption of an input content
"""
# TODO: add exceptions
self.logger.debug("rsa decryption")
try:
plaintext = user_sk.decrypt(
content, asym_padding.OAEP(
mgf=asym_padding.MGF1(algorithm=SHA1()),
algorithm=SHA1(),
label=None
)
)
except InvalidKey:
self.logger.warning("Invalid key!")
return
return plaintext
def __sign_content(self, content, user_sk):
""" Produce a signature of an input content using RSASSA-PSS scheme
@developer: vsmysle
:param content: bytes content to sign
:param user_sk: instance of cryptography.hazmat.primitives.rsa.
RSAPrivateKey
:return: bytes of signature of the input content
"""
# TODO: add exceptions
self.logger.debug("generating a signature of an input content")
# creating signer that will sign our content
try:
signer = user_sk.signer(
# we use RSASSA-PSS padding for the signature scheme
asym_padding.PSS(
mgf=asym_padding.MGF1(SHA1()),
salt_length=asym_padding.PSS.MAX_LENGTH
),
SHA1()
)
except InvalidKey:
self.logger.warning("Invalid key!")
return
signer.update(content)
signature = signer.finalize()
self.logger.info("signature generation finished")
return signature
def __verify_signature(self, signature, signer_pk, content):
""" Verify RSASSA-PSS signature
@developer: vsmysle
:param signature: signature bytes to verify
:param signer_pk: instance of cryptography.hazmat.primitives.
rsa.RSAPublicKey that is a public key of a signer
:param content: content to verify a signature of
:return: bool verification result
"""
self.logger.debug("starting signature verification routine")
try:
signer_pk.verify(
signature,
content,
asym_padding.PSS(
mgf=asym_padding.MGF1(SHA1()),
salt_length=asym_padding.PSS.MAX_LENGTH
),
SHA1()
)
except InvalidSignature:
self.logger.warn("signature verification failed")
return False
self.logger.info("signature OK")
return True
def sign(self, msg, key):
return key.sign(
msg,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
def verify(self, msg, key, sig):
try:
key.verify(
sig,
msg,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
return True
except InvalidSignature:
return False
def sign(self, private_key):
return private_key.sign(
self.string_to_sign.encode("utf-8"),
padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
salt_length=32),
hashes.SHA256())
def sign(self, message, private_key):
"""Sign the message.
This method will take the provided message and create a
signature using the provided RSA private key. The resulting
signature is stored in the fulfillment.
The key should be provided as a PEM encoded private key string.
The message is padded using RSA-PSS with SHA256.
Args:
message (bytes): Message to sign.
private_key (bytes): RSA private key.
"""
private_key_obj = serialization.load_pem_private_key(
private_key,
password=None,
backend=default_backend(),
)
if self.modulus is None:
m_int = private_key_obj.public_key().public_numbers().n
m_bytes = m_int.to_bytes(
(m_int.bit_length() + 7) // 8, 'big')
self._set_public_modulus(m_bytes)
signer = private_key_obj.signer(
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=SALT_LENGTH,
),
hashes.SHA256(),
)
signer.update(message)
self.signature = signer.finalize()
def validate(self, message):
"""Verify the signature of self RSA fulfillment.
The signature of self RSA fulfillment is verified against the
provided message and the condition's public modulus.
Args:
message (bytes): Message to verify.
Returns:
bool: Whether self fulfillment is valid.
"""
if not isinstance(message, bytes):
raise Exception(
'Message must be provided as bytes, was: ' + message)
public_numbers = RSAPublicNumbers(
PUBLIC_EXPONENT,
int.from_bytes(self.modulus, byteorder='big'),
)
public_key = public_numbers.public_key(default_backend())
verifier = public_key.verifier(
self.signature,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=SALT_LENGTH,
),
hashes.SHA256()
)
verifier.update(message)
try:
verifier.verify()
except InvalidSignature as exc:
raise ValidationError('Invalid RSA signature') from exc
return True
def encrypt(self, message):
if self._value is None:
raise ValueError("Can't Encrypt with empty key.")
try:
return self._value.encrypt(
message,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None))
except ValueError as e:
raise CipherError(e)
def get_signer(self):
"""Gets an incremental verifier.
Must call .update() on the verifier and then .finalize() to check.
"""
hash_algorithm = hashes.SHA256()
padding_algorithm = padding.PSS(mgf=padding.MGF1(hash_algorithm),
salt_length=padding.PSS.MAX_LENGTH)
padding_algorithm = padding.PKCS1v15()
return self._value.signer(padding_algorithm, hash_algorithm)
def decrypt(self, message):
if self._value is None:
raise ValueError("Can't Decrypt with empty key.")
try:
return self._value.decrypt(
message,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None))
except ValueError as e:
raise CipherError(e)
def sign(self, msg, key):
signer = key.signer(
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
signer.update(msg)
return signer.finalize()