def verify(self, public_key, message, signature):
"""ECDSA verify signature.
:param public_key: Signing public key
:param message: Origin message
:param signature: Signature of message
:Returns: verify result boolean, True means valid
"""
if not (self._check_malleability(signature)):
return False
verifier = public_key.verifier(signature,
ec.ECDSA(self.sign_hash_algorithm))
verifier.update(message)
try:
verifier.verify()
except InvalidSignature:
return False
except Exception as e:
raise e
return True
python类InvalidSignature()的实例源码
def verify_hmac(expected_result, secret_key, unique_prefix, data):
'''
Perform a HMAC using the secret key, unique hash prefix, and data, and
verify that the result of:
HMAC-SHA256(secret_key, unique_prefix | data)
matches the bytes in expected_result.
The key must be kept secret. The prefix ensures hash uniqueness.
Returns True if the signature matches, and False if it does not.
'''
# If the secret key is shorter than the digest size, security is reduced
assert secret_key
assert len(secret_key) >= CryptoHash.digest_size
h = hmac.HMAC(bytes(secret_key), CryptoHash(), backend=default_backend())
h.update(bytes(unique_prefix))
h.update(bytes(data))
try:
h.verify(bytes(expected_result))
return True
except InvalidSignature:
return False
def handshake(self):
if self.id:
raise HandshakeError('Handshake already done.')
challenge = _generate_challenge()
query = {'handshake': 'challenge', 'challenge': challenge}
yield Effect(EHandshakeSend(ejson_dumps(query)))
raw_resp = yield Effect(EHandshakeRecv())
try:
resp = ejson_loads(raw_resp)
except (TypeError, json.JSONDecodeError):
error = HandshakeError('Invalid challenge response format')
yield Effect(EHandshakeSend(error.to_raw()))
raise error
resp = HandshakeAnswerSchema().load(resp)
claimed_identity = resp['identity']
try:
pubkey = yield Effect(EPubKeyGet(claimed_identity))
pubkey.verify(resp['answer'], challenge.encode())
yield Effect(EHandshakeSend('{"status": "ok", "handshake": "done"}'))
self.id = claimed_identity
except (TypeError, PubKeyNotFound, InvalidSignature):
error = HandshakeError('Invalid signature, challenge or identity')
yield Effect(EHandshakeSend(error.to_raw()))
raise error
def sign_test(self):
"""???/??? ?? ? ?? ???
:return: ??? ??(True/False)
"""
if self.is_secure is False:
logging.debug("CA is not secure_mode")
return False
data = b"test"
signature = self.__ca_pri.sign(
data,
ec.ECDSA(hashes.SHA256())
)
try:
pub_key = self.__ca_cert.public_key()
return pub_key.verify(
signature,
data,
ec.ECDSA(hashes.SHA256())
)
except InvalidSignature:
logging.debug("cert test fail!!!")
return False
def verify(self, msg, key, sig):
verifier = key.verifier(
sig,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
verifier.update(msg)
try:
verifier.verify()
return True
except InvalidSignature:
return False
def verify_ssh_sig(self, data, msg):
if msg.get_text() != self.ecdsa_curve.key_format_identifier:
return False
sig = msg.get_binary()
sigR, sigS = self._sigdecode(sig)
signature = encode_dss_signature(sigR, sigS)
verifier = self.verifying_key.verifier(
signature, ec.ECDSA(self.ecdsa_curve.hash_object())
)
verifier.update(data)
try:
verifier.verify()
except InvalidSignature:
return False
else:
return True
def verify_ssh_sig(self, data, msg):
if msg.get_text() != 'ssh-rsa':
return False
key = self.key
if isinstance(key, rsa.RSAPrivateKey):
key = key.public_key()
verifier = key.verifier(
signature=msg.get_binary(),
padding=padding.PKCS1v15(),
algorithm=hashes.SHA1(),
)
verifier.update(data)
try:
verifier.verify()
except InvalidSignature:
return False
else:
return True
def rsa(public_key, signature, message):
"""Verifies an RSA signature.
Args:
public_key (str): Public key with BEGIN and END sections.
signature (str): Hex value of the signature with its leading 0x stripped.
message (str): Message that was signed, unhashed.
"""
try:
public_rsa = load_pem_public_key(bytes(public_key), backend=default_backend())
hashed = util.sha256(message)
public_rsa.verify(
binascii.unhexlify(signature),
hashed,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
except InvalidSignature:
raise Exception('Invalid signature')
def verify_ssh_sig(self, data, msg):
if msg.get_text() != self.ecdsa_curve.key_format_identifier:
return False
sig = msg.get_binary()
sigR, sigS = self._sigdecode(sig)
signature = encode_dss_signature(sigR, sigS)
verifier = self.verifying_key.verifier(
signature, ec.ECDSA(self.ecdsa_curve.hash_object())
)
verifier.update(data)
try:
verifier.verify()
except InvalidSignature:
return False
else:
return True
def verify_ssh_sig(self, data, msg):
if msg.get_text() != 'ssh-rsa':
return False
key = self.key
if isinstance(key, rsa.RSAPrivateKey):
key = key.public_key()
verifier = key.verifier(
signature=msg.get_binary(),
padding=padding.PKCS1v15(),
algorithm=hashes.SHA1(),
)
verifier.update(data)
try:
verifier.verify()
except InvalidSignature:
return False
else:
return True
def _decrypt_cryptography(cls, b_ciphertext, b_crypted_hmac, b_key1, b_key2, b_iv):
# b_key1, b_key2, b_iv = self._gen_key_initctr(b_password, b_salt)
# EXIT EARLY IF DIGEST DOESN'T MATCH
hmac = HMAC(b_key2, hashes.SHA256(), CRYPTOGRAPHY_BACKEND)
hmac.update(b_ciphertext)
try:
hmac.verify(unhexlify(b_crypted_hmac))
except InvalidSignature as e:
raise AnsibleVaultError('HMAC verification failed: %s' % e)
cipher = C_Cipher(algorithms.AES(b_key1), modes.CTR(b_iv), CRYPTOGRAPHY_BACKEND)
decryptor = cipher.decryptor()
unpadder = padding.PKCS7(128).unpadder()
b_plaintext = unpadder.update(
decryptor.update(b_ciphertext) + decryptor.finalize()
) + unpadder.finalize()
return b_plaintext
def verify(self, msg, key, sig):
verifier = key.verifier(
sig,
padding.PSS(
mgf=padding.MGF1(self.hash_alg()),
salt_length=self.hash_alg.digest_size
),
self.hash_alg()
)
verifier.update(msg)
try:
verifier.verify()
return True
except InvalidSignature:
return False
def verify_ssh_sig(self, data, msg):
if msg.get_text() != self.ecdsa_curve.key_format_identifier:
return False
sig = msg.get_binary()
sigR, sigS = self._sigdecode(sig)
signature = encode_dss_signature(sigR, sigS)
verifier = self.verifying_key.verifier(
signature, ec.ECDSA(self.ecdsa_curve.hash_object())
)
verifier.update(data)
try:
verifier.verify()
except InvalidSignature:
return False
else:
return True
def verify_ssh_sig(self, data, msg):
if msg.get_text() != 'ssh-rsa':
return False
key = self.key
if isinstance(key, rsa.RSAPrivateKey):
key = key.public_key()
verifier = key.verifier(
signature=msg.get_binary(),
padding=padding.PKCS1v15(),
algorithm=hashes.SHA1(),
)
verifier.update(data)
try:
verifier.verify()
except InvalidSignature:
return False
else:
return True
def _rsa_verify_sig(sig_value, formatted, public_key_jsonld):
"""
- sig_value: data to be verified
- public_key: creator of this document's public_key
- tbv: to be verified
"""
# TODO: Support other formats than just PEM
public_key = serialization.load_pem_public_key(
_get_value(public_key_jsonld, "publicKeyPem").encode("utf-8"),
backend=default_backend())
try:
public_key.verify(
base64.b64decode(sig_value.encode("utf-8")), formatted,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256())
return True
except InvalidSignature:
return False
# In the future, we'll be doing a lot more work based on what suite is
# selected.
def verify_ssh_sig(self, data, msg):
if msg.get_text() != self.ecdsa_curve.key_format_identifier:
return False
sig = msg.get_binary()
sigR, sigS = self._sigdecode(sig)
signature = encode_dss_signature(sigR, sigS)
verifier = self.verifying_key.verifier(
signature, ec.ECDSA(self.ecdsa_curve.hash_object())
)
verifier.update(data)
try:
verifier.verify()
except InvalidSignature:
return False
else:
return True
def verify_ssh_sig(self, data, msg):
if msg.get_text() != 'ssh-rsa':
return False
key = self.key
if isinstance(key, rsa.RSAPrivateKey):
key = key.public_key()
verifier = key.verifier(
signature=msg.get_binary(),
padding=padding.PKCS1v15(),
algorithm=hashes.SHA1(),
)
verifier.update(data)
try:
verifier.verify()
except InvalidSignature:
return False
else:
return True
def verify(self, res):
"""Verify response from server
Taken from https://github.com/madeddie/python-bunq - Thanks!
:param res: request to be verified
:type res: requests.models.Response
"""
if not self.server_pubkey:
print('No server public key defined, skipping verification')
return
serv_headers = [
'X-Bunq-Client-Request-Id',
'X-Bunq-Client-Response-Id'
]
msg = '%s\n%s\n\n%s' % (
res.status_code,
'\n'.join(
['%s: %s' % (k, v) for k, v in sorted(
res.headers.items()
) if k in serv_headers]
),
res.text
)
signature = base64.b64decode(res.headers['X-Bunq-Server-Signature'])
try:
self.server_pubkey_pem.verify(
signature,
msg.encode(),
padding.PKCS1v15(),
hashes.SHA256()
)
except InvalidSignature:
print('Message failed verification, data might be tampered with')
return False
else:
return True
def is_issuer(issuing_certificate, issued_certificate):
"""Determine if the issuing cert is the parent of the issued cert.
Determine if the issuing certificate is the parent of the issued
certificate by:
* conducting subject and issuer name matching, and
* verifying the signature of the issued certificate with the issuing
certificate's public key
:param issuing_certificate: the cryptography certificate object that
is the potential parent of the issued certificate
:param issued_certificate: the cryptography certificate object that
is the potential child of the issuing certificate
:return: True if the issuing certificate is the parent of the issued
certificate, False otherwise.
"""
if (issuing_certificate is None) or (issued_certificate is None):
return False
elif issuing_certificate.subject != issued_certificate.issuer:
return False
else:
try:
verify_certificate_signature(
issuing_certificate,
issued_certificate
)
except cryptography_exceptions.InvalidSignature:
# If verification fails, an exception is expected.
return False
return True
def verify_certificate_signature(signing_certificate, certificate):
"""Verify that the certificate was signed correctly.
:param signing_certificate: the cryptography certificate object used to
sign the certificate
:param certificate: the cryptography certificate object that was signed
by the signing certificate
:raises: cryptography.exceptions.InvalidSignature if certificate signature
verification fails.
"""
signature_hash_algorithm = certificate.signature_hash_algorithm
signature_bytes = certificate.signature
signer_public_key = signing_certificate.public_key()
if isinstance(signer_public_key, rsa.RSAPublicKey):
verifier = signer_public_key.verifier(
signature_bytes, padding.PKCS1v15(), signature_hash_algorithm
)
elif isinstance(signer_public_key, ec.EllipticCurvePublicKey):
verifier = signer_public_key.verifier(
signature_bytes, ec.ECDSA(signature_hash_algorithm)
)
else:
verifier = signer_public_key.verifier(
signature_bytes, signature_hash_algorithm
)
verifier.update(certificate.tbs_certificate_bytes)
verifier.verify()
def test_verify_signature_bad_signature(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
verifier = signature_utils.get_verifier(None, img_sig_cert_uuid,
'SHA-256', 'BLAH',
signature_utils.RSA_PSS)
verifier.update(data)
self.assertRaises(crypto_exceptions.InvalidSignature,
verifier.verify)
def unsign(self, signed_value, ttl=None):
"""
Retrieve original value and check it wasn't signed more
than max_age seconds ago.
:type signed_value: bytes
:type ttl: int | datetime.timedelta
"""
h_size, d_size = struct.calcsize('>cQ'), self.digest.digest_size
fmt = '>cQ%ds%ds' % (len(signed_value) - h_size - d_size, d_size)
try:
version, timestamp, value, sig = struct.unpack(fmt, signed_value)
except struct.error:
raise BadSignature('Signature is not valid')
if version != self.version:
raise BadSignature('Signature version not supported')
if ttl is not None:
if isinstance(ttl, datetime.timedelta):
ttl = ttl.total_seconds()
# Check timestamp is not older than ttl
age = abs(time.time() - timestamp)
if age > ttl + _MAX_CLOCK_SKEW:
raise SignatureExpired('Signature age %s > %s seconds' % (age,
ttl))
try:
self.signature(signed_value[:-d_size]).verify(sig)
except InvalidSignature:
raise BadSignature(
'Signature "%s" does not match' % binascii.b2a_base64(sig))
return value
def _verify(self, message, signature):
try:
self._public_key.verify(signature, message, crypto_padding.PKCS1v15(), crypto_hashes.SHA256())
return True
except crypto_exceptions.InvalidSignature:
return False
def __load_cert(self, cert_dir):
"""???/??? ?? ? ?? ???
:param cert_dir: ???/??? ?? ??
:return: X509 ???
"""
logging.debug("Cert/Key loading...")
cert_file = join(cert_dir, "cert.pem")
pri_file = join(cert_dir, "key.pem")
f = open(cert_file, "rb")
cert_bytes = f.read()
f.close()
cert = x509.load_pem_x509_certificate(cert_bytes, default_backend())
f = open(pri_file, "rb")
pri_bytes = f.read()
f.close()
try:
pri = serialization.load_pem_private_key(pri_bytes, self.__PASSWD, default_backend())
except ValueError:
logging.debug("Invalid Password(%s)", cert_dir)
return None
data = b"test"
signature = pri.sign(data, ec.ECDSA(hashes.SHA256()))
try:
pub_key = cert.public_key()
result = pub_key.verify(signature, data, ec.ECDSA(hashes.SHA256()))
except InvalidSignature:
logging.debug("sign test fail")
result = False
if result:
return cert
else:
logging.error("result is False ")
return None
def verify_certificate(self, peer_cert):
"""
??? ??
:param peer_cert: ?????
:return: ????
"""
# ??? ???? ??
not_after = peer_cert.not_valid_after
now = datetime.datetime.now()
if not_after < now:
logging.error("Certificate is Expired")
return False
# ??? ?? ??
ca_pub = self.__ca_cert.public_key()
signature = peer_cert.signature
data = peer_cert.tbs_certificate_bytes
validation_result = False
try:
ca_pub.verify(
signature=signature,
data=data,
signature_algorithm=ec.ECDSA(hashes.SHA256())
)
validation_result = True
except InvalidSignature:
logging.debug("InvalidSignatureException")
return validation_result
def verify_data_with_publickey(public_key, data: bytes, signature: bytes, is_hash: bool=False) -> bool:
"""??? DATA ??
:param public_key: ??? ???
:param data: ?? ?? ??
:param signature: ?? ???
:param is_hash: ?? hashed ??(True/False
:return: ?? ?? ??(True/False)
"""
hash_algorithm = hashes.SHA256()
if is_hash:
hash_algorithm = utils.Prehashed(hash_algorithm)
if isinstance(public_key, ec.EllipticCurvePublicKeyWithSerialization):
try:
public_key.verify(
signature=signature,
data=data,
signature_algorithm=ec.ECDSA(hash_algorithm)
)
return True
except InvalidSignature:
logging.debug("InvalidSignatureException_ECDSA")
elif isinstance(public_key, rsa.RSAPublicKeyWithSerialization):
try:
public_key.verify(
signature,
data,
padding.PKCS1v15(),
hash_algorithm
)
return True
except InvalidSignature:
logging.debug("InvalidSignatureException_RSA")
else:
logging.debug("Unknown PublicKey Type : %s", type(public_key))
return False
def verify(self, res):
"""Verify response from server
Taken from https://github.com/madeddie/python-bunq - Thanks!
:param res: request to be verified
:type res: requests.models.Response
"""
if not self.server_pubkey:
print('No server public key defined, skipping verification')
return True
serv_headers = [
'X-Bunq-Client-Request-Id',
'X-Bunq-Client-Response-Id'
]
msg = '%s\n%s\n\n%s' % (
res.status_code,
'\n'.join(
['%s: %s' % (k, v) for k, v in sorted(
res.headers.items()
) if k in serv_headers]
),
res.text
)
signature = base64.b64decode(res.headers['X-Bunq-Server-Signature'])
try:
self.server_pubkey_pem.verify(
signature,
msg.encode(),
padding.PKCS1v15(),
hashes.SHA256()
)
except InvalidSignature:
print('Message failed verification, data might be tampered with')
return False
else:
return True
def verify(self, msg, key, sig):
verifier = key.verifier(
sig,
padding.PKCS1v15(),
self.hash_alg()
)
verifier.update(msg)
try:
verifier.verify()
return True
except InvalidSignature:
return False
def verify(self, msg, key, sig):
try:
der_sig = raw_to_der_signature(sig, key.curve)
except ValueError:
return False
verifier = key.verifier(der_sig, ec.ECDSA(self.hash_alg()))
verifier.update(msg)
try:
verifier.verify()
return True
except InvalidSignature:
return False
def verify(self, signature, data, hash_context):
if not isinstance(hash_context, hashes.HashContext):
raise TypeError("hash_context must be an instance of hashes.HashContext.")
size = self.public_numbers.parameter_numbers.q.bit_length() // 8
r, s = (bytes_to_long(value) for value in read_content(signature, '{0}s{0}s'.format(size)))
# r, s = (bytes_to_long(value) for value in read_content(signature, '20s20s'))
verifier = self._key.verifier(encode_dss_signature(r, s), hashes.SHA256())
verifier._hash_ctx = hash_context
verifier.update(data)
try:
verifier.verify()
except InvalidSignature:
raise ValueError("invalid signature")