def test_verify_signature_PSS(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
signer = TEST_RSA_PRIVATE_KEY.signer(
padding.PSS(
mgf=padding.MGF1(hash_alg),
salt_length=padding.PSS.MAX_LENGTH
),
hash_alg
)
signer.update(data)
signature = base64.b64encode(signer.finalize())
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
verifier = signature_utils.get_verifier(None, img_sig_cert_uuid,
hash_name, signature,
signature_utils.RSA_PSS)
verifier.update(data)
verifier.verify()
python类PSS的实例源码
def create_verifier_for_pss(signature, hash_method, public_key):
"""Create the verifier to use when the key type is RSA-PSS.
:param signature: the decoded signature to use
:param hash_method: the hash method to use, as a cryptography object
:param public_key: the public key to use, as a cryptography object
:raises: SignatureVerificationError if the RSA-PSS specific properties
are invalid
:returns: the verifier to use to verify the signature for RSA-PSS
"""
# default to MGF1
mgf = padding.MGF1(hash_method)
# default to max salt length
salt_length = padding.PSS.MAX_LENGTH
# return the verifier
return public_key.verifier(
signature,
padding.PSS(mgf=mgf, salt_length=salt_length),
hash_method
)
def signing(cls, message, private_key, algorithm='sha1'):
"""signing."""
if not isinstance(message, bytes):
message = message.encode()
algorithm = cls.ALGORITHM_DICT.get(algorithm)
signer = private_key.signer(
padding.PSS(
mgf=padding.MGF1(algorithm),
salt_length=padding.PSS.MAX_LENGTH
),
algorithm
)
signer.update(message)
return signer.finalize()
def verification(cls, message, signature, public_key, algorithm='sha1'):
"""Verification."""
if not isinstance(message, bytes):
message = message.encode()
algorithm = cls.ALGORITHM_DICT.get(algorithm)
verifier = public_key().verifier(
signature,
padding.PSS(
mgf=padding.MGF1(algorithm),
salt_length=padding.PSS.MAX_LENGTH
),
algorithm
)
verifier.update(message)
return verifier.verify()
def verify(self, msg, key, sig):
verifier = key.verifier(
sig,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
verifier.update(msg)
try:
verifier.verify()
return True
except InvalidSignature:
return False
def rsa(public_key, signature, message):
"""Verifies an RSA signature.
Args:
public_key (str): Public key with BEGIN and END sections.
signature (str): Hex value of the signature with its leading 0x stripped.
message (str): Message that was signed, unhashed.
"""
try:
public_rsa = load_pem_public_key(bytes(public_key), backend=default_backend())
hashed = util.sha256(message)
public_rsa.verify(
binascii.unhexlify(signature),
hashed,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
except InvalidSignature:
raise Exception('Invalid signature')
def rsa_sign(private_key, message):
"""Signs a message with the provided Rsa private key.
Args:
private_key (str): Rsa private key with BEGIN and END sections.
message (str): Message to be hashed and signed.
Returns:
str: Hex signature of the message, with its leading 0x stripped.
"""
private_rsa = load_pem_private_key(bytes(private_key), password=None, backend=default_backend())
hashed = sha256(message)
signature = private_rsa.sign(
hashed,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return binascii.hexlify(bytearray(signature))
def verify(self, msg, key, sig):
verifier = key.verifier(
sig,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
verifier.update(msg)
try:
verifier.verify()
return True
except InvalidSignature:
return False
def _sign_image(self, image_file):
LOG.debug("Creating signature for image data")
signer = self.private_key.signer(
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
chunk_bytes = 8192
with open(image_file, 'rb') as f:
chunk = f.read(chunk_bytes)
while len(chunk) > 0:
signer.update(chunk)
chunk = f.read(chunk_bytes)
signature = signer.finalize()
signature_b64 = base64.b64encode(signature)
return signature_b64
def sign_and_upload_image(self):
img_signature = self._sign_image(self.img_file)
img_properties = {
'img_signature': img_signature,
'img_signature_certificate_uuid': self.signing_cert_uuid,
'img_signature_key_type': 'RSA-PSS',
'img_signature_hash_method': 'SHA-256',
}
LOG.debug("Uploading image with signature metadata properties")
img_uuid = self._image_create(
'signed_img',
CONF.scenario.img_container_format,
self.img_file,
disk_format=CONF.scenario.img_disk_format,
properties=img_properties
)
LOG.debug("Uploaded image %s", img_uuid)
return img_uuid
def _rsa_verify_sig(sig_value, formatted, public_key_jsonld):
"""
- sig_value: data to be verified
- public_key: creator of this document's public_key
- tbv: to be verified
"""
# TODO: Support other formats than just PEM
public_key = serialization.load_pem_public_key(
_get_value(public_key_jsonld, "publicKeyPem").encode("utf-8"),
backend=default_backend())
try:
public_key.verify(
base64.b64decode(sig_value.encode("utf-8")), formatted,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256())
return True
except InvalidSignature:
return False
# In the future, we'll be doing a lot more work based on what suite is
# selected.
test_signature_utils.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_verify_signature_PSS(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
signer = TEST_RSA_PRIVATE_KEY.signer(
padding.PSS(
mgf=padding.MGF1(hash_alg),
salt_length=padding.PSS.MAX_LENGTH
),
hash_alg
)
signer.update(data)
signature = base64.b64encode(signer.finalize())
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
verifier = signature_utils.get_verifier(None, img_sig_cert_uuid,
hash_name, signature,
signature_utils.RSA_PSS)
verifier.update(data)
verifier.verify()
signature_utils.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def create_verifier_for_pss(signature, hash_method, public_key):
"""Create the verifier to use when the key type is RSA-PSS.
:param signature: the decoded signature to use
:param hash_method: the hash method to use, as a cryptography object
:param public_key: the public key to use, as a cryptography object
:raises: SignatureVerificationError if the RSA-PSS specific properties
are invalid
:returns: the verifier to use to verify the signature for RSA-PSS
"""
# default to MGF1
mgf = padding.MGF1(hash_method)
# default to max salt length
salt_length = padding.PSS.MAX_LENGTH
# return the verifier
return public_key.verifier(
signature,
padding.PSS(mgf=mgf, salt_length=salt_length),
hash_method
)
def test_signature_key_type_lookup_fail(self):
self.assertRaisesRegex(exception.SignatureVerificationError,
'Invalid signature key type: .*',
signature_utils.SignatureKeyType.lookup,
'RSB-PSS')
def verify(self, signature: bytes, message: bytes):
return self._hazmat_public_key.verify(
signature,
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
def sign(self, message: bytes):
return self._hazmat_private_key.sign(
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
def _generate_sign(self, pri_key, data):
"""
?? ??? ??
:param pri_key: ??? ???
:param data: ?? ?? ???
:return: ??? ?? ???
"""
_signature = None
# ???? Type(RSA, ECC)? ?? ?? ?? ??
if isinstance(pri_key, ec.EllipticCurvePrivateKeyWithSerialization):
# ECDSA ??
logging.debug("Sign ECDSA")
signer = pri_key.signer(ec.ECDSA(hashes.SHA256()))
signer.update(data)
_signature = signer.finalize()
elif isinstance(pri_key, rsa.RSAPrivateKeyWithSerialization):
# RSA ??
logging.debug("Sign RSA")
_signature = pri_key.sign(
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
else:
logging.debug("Unknown PrivateKey Type : %s", type(pri_key))
return _signature
def __init__(self, name, hash_):
super(_JWAPS, self).__init__(name)
self.padding = padding.PSS(
mgf=padding.MGF1(hash_()),
salt_length=padding.PSS.MAX_LENGTH)
self.hash = hash_()
def sign(self, msg, key):
signer = key.signer(
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
signer.update(msg)
return signer.finalize()
def sign_message(privkey, message):
plaintext = b64decode(message.encode('utf-8'))
signature = privkey.sign(
plaintext,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return b64encode(signature).decode('utf-8')
def __sign_content(self, content, user_sk):
""" Produce a signature of an input content using RSASSA-PSS scheme
@developer: vsmysle
:param content: bytes content to sign
:param user_sk: instance of cryptography.hazmat.primitives.rsa.
RSAPrivateKey
:return: bytes of signature of the input content
"""
# TODO: add exceptions
self.logger.debug("generating a signature of an input content")
# creating signer that will sign our content
try:
signer = user_sk.signer(
# we use RSASSA-PSS padding for the signature scheme
asym_padding.PSS(
mgf=asym_padding.MGF1(SHA1()),
salt_length=asym_padding.PSS.MAX_LENGTH
),
SHA1()
)
except InvalidKey:
self.logger.warning("Invalid key!")
return
signer.update(content)
signature = signer.finalize()
self.logger.info("signature generation finished")
return signature
def __verify_signature(self, signature, signer_pk, content):
""" Verify RSASSA-PSS signature
@developer: vsmysle
:param signature: signature bytes to verify
:param signer_pk: instance of cryptography.hazmat.primitives.
rsa.RSAPublicKey that is a public key of a signer
:param content: content to verify a signature of
:return: bool verification result
"""
self.logger.debug("starting signature verification routine")
try:
signer_pk.verify(
signature,
content,
asym_padding.PSS(
mgf=asym_padding.MGF1(SHA1()),
salt_length=asym_padding.PSS.MAX_LENGTH
),
SHA1()
)
except InvalidSignature:
self.logger.warn("signature verification failed")
return False
self.logger.info("signature OK")
return True
def sign(self, msg, key):
return key.sign(
msg,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
def verify(self, msg, key, sig):
try:
key.verify(
sig,
msg,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
return True
except InvalidSignature:
return False
def sign(self, private_key):
return private_key.sign(
self.string_to_sign.encode("utf-8"),
padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
salt_length=32),
hashes.SHA256())
def _set_signature(self, signature):
"""Set the signature manually.
The signature must be a valid RSA-PSS siganture.
Args:
signature (bytes): RSA signature.
"""
if not isinstance(signature, bytes):
raise TypeError('Signature must be bytes, was: ' + signature)
self.signature = signature
def sign(self, message, private_key):
"""Sign the message.
This method will take the provided message and create a
signature using the provided RSA private key. The resulting
signature is stored in the fulfillment.
The key should be provided as a PEM encoded private key string.
The message is padded using RSA-PSS with SHA256.
Args:
message (bytes): Message to sign.
private_key (bytes): RSA private key.
"""
private_key_obj = serialization.load_pem_private_key(
private_key,
password=None,
backend=default_backend(),
)
if self.modulus is None:
m_int = private_key_obj.public_key().public_numbers().n
m_bytes = m_int.to_bytes(
(m_int.bit_length() + 7) // 8, 'big')
self._set_public_modulus(m_bytes)
signer = private_key_obj.signer(
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=SALT_LENGTH,
),
hashes.SHA256(),
)
signer.update(message)
self.signature = signer.finalize()
def validate(self, message):
"""Verify the signature of self RSA fulfillment.
The signature of self RSA fulfillment is verified against the
provided message and the condition's public modulus.
Args:
message (bytes): Message to verify.
Returns:
bool: Whether self fulfillment is valid.
"""
if not isinstance(message, bytes):
raise Exception(
'Message must be provided as bytes, was: ' + message)
public_numbers = RSAPublicNumbers(
PUBLIC_EXPONENT,
int.from_bytes(self.modulus, byteorder='big'),
)
public_key = public_numbers.public_key(default_backend())
verifier = public_key.verifier(
self.signature,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=SALT_LENGTH,
),
hashes.SHA256()
)
verifier.update(message)
try:
verifier.verify()
except InvalidSignature as exc:
raise ValidationError('Invalid RSA signature') from exc
return True
def get_signer(self):
"""Gets an incremental verifier.
Must call .update() on the verifier and then .finalize() to check.
"""
hash_algorithm = hashes.SHA256()
padding_algorithm = padding.PSS(mgf=padding.MGF1(hash_algorithm),
salt_length=padding.PSS.MAX_LENGTH)
padding_algorithm = padding.PKCS1v15()
return self._value.signer(padding_algorithm, hash_algorithm)
def sign(self, msg, key):
signer = key.signer(
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
signer.update(msg)
return signer.finalize()