def _ValidatePubkeyGeneric(self, signing_cert, digest_alg, payload,
enc_digest):
cert = x509.load_der_x509_certificate(der_encoder.encode(signing_cert), default_backend())
pubkey = cert.public_key()
if isinstance(pubkey, RSAPublicKey):
verifier = pubkey.verifier(enc_digest, padding.PKCS1v15(), cert.signature_hash_algorithm)
else:
verifier = pubkey.verifier(enc_digest, cert.signature_hash_algorithm)
verifier.update(payload)
try:
verifier.verify()
return True
except:
return False
python类RSAPublicKey()的实例源码
def test_convert_public_pkey_to_cryptography_key(self):
"""
PKey.to_cryptography_key creates a proper cryptography public key.
"""
pkey = load_publickey(FILETYPE_PEM, cleartextPublicKeyPEM)
key = pkey.to_cryptography_key()
assert isinstance(key, rsa.RSAPublicKey)
assert pkey.bits() == key.key_size
def display_public_key(pubkey, indent=""):
if isinstance(pubkey, rsa.RSAPublicKey):
display_rsa_public_key(pubkey, indent=indent)
elif isinstance(pubkey, dsa.DSAPublicKey):
display_dsa_public_key(pubkey, indent=indent)
elif isinstance(pubkey, ec.EllipticCurvePublicKey):
display_ec_public_key(pubkey, indent=indent)
else:
raise ValueError("Unknown public key type")
def get_public_key_label(pubkey):
if isinstance(pubkey, rsa.RSAPublicKey):
return "RSA Public Key"
elif isinstance(pubkey, dsa.DSAPublicKey):
return "DSA Public Key"
elif isinstance(pubkey, ec.EllipticCurvePublicKey):
return "Elliptic Curve Public Key"
return None
# From http://matthiaseisen.com/articles/graphviz/
def public_key(self, key):
"""
Sets the requestor's public key (as found in the signing request).
"""
if not isinstance(key, (dsa.DSAPublicKey, rsa.RSAPublicKey,
ec.EllipticCurvePublicKey)):
raise TypeError('Expecting one of DSAPublicKey, RSAPublicKey,'
' or EllipticCurvePublicKey.')
if self._public_key is not None:
raise ValueError('The public key may only be set once.')
return CertificateBuilder(
self._issuer_name, self._subject_name, key,
self._serial_number, self._not_valid_before,
self._not_valid_after, self._extensions
)
def public_key(self, key):
"""
Sets the requestor's public key (as found in the signing request).
"""
if not isinstance(key, (dsa.DSAPublicKey, rsa.RSAPublicKey,
ec.EllipticCurvePublicKey)):
raise TypeError('Expecting one of DSAPublicKey, RSAPublicKey,'
' or EllipticCurvePublicKey.')
if self._public_key is not None:
raise ValueError('The public key may only be set once.')
return CertificateBuilder(
self._issuer_name, self._subject_name, key,
self._serial_number, self._not_valid_before,
self._not_valid_after, self._extensions
)
def public_key(self, key):
"""
Sets the requestor's public key (as found in the signing request).
"""
if not isinstance(key, (dsa.DSAPublicKey, rsa.RSAPublicKey,
ec.EllipticCurvePublicKey)):
raise TypeError('Expecting one of DSAPublicKey, RSAPublicKey,'
' or EllipticCurvePublicKey.')
if self._public_key is not None:
raise ValueError('The public key may only be set once.')
return CertificateBuilder(
self._issuer_name, self._subject_name, key,
self._serial_number, self._not_valid_before,
self._not_valid_after, self._extensions
)
def verifyCert(cert, signCert):
"""
Verifies that a certificate has been signed with another. Note that this
function only verifies the cryptographic signature and is probably wrong and
dangerous. Do not use it to verify certificates. This function only supports
ECDSA and RSA+PKCS1 signatures, all other signature types will fail.
:param cert: The certificate whose signature we want to verify as a
cryptography certificate object.
:param signCert: The certificate that was used to sign the first certificate
as a cryptography certificate object.
:return: True if the signature is a valid ECDSA signature, False otherwise.
"""
# FIXME: This is very likely wrong and we should find a better way to verify certs.
halg = cert.signature_hash_algorithm
sig = cert.signature
data = cert.tbs_certificate_bytes
pubKey = signCert.public_key()
alg = None
# We only support ECDSA and RSA+PKCS1
if isinstance(pubKey, ec.EllipticCurvePublicKey):
alg = ec.ECDSA(halg)
ver = pubKey.verifier(sig, alg)
elif isinstance(pubKey, rsa.RSAPublicKey):
pad = padding.PKCS1v15()
ver = pubKey.verifier(sig, pad, halg)
else:
return False
ver.update(data)
try:
ver.verify()
return True
except InvalidSignature as e:
return False
def __encrypt_with_rsa(self, content, recipient_pk):
""" Encrypt content with RSAES-OAEP scheme
@developer: vsmysle
This method handles an encryption of a *single* RSA block with a
specified above scheme. It does not handle splitting of a header into
several blocks. It has to be done by other method that would use this
one only for single block encryption purpose.
TODO: what is a maximum size of a content that can be padded and
encrypted given a particular size of RSA key?
:param content: bytes content to encrypt (probably a part of
ASN.1 DER-encoded MPHeader block)
:param recipient_pk: instance of cryptography.hazmat.primitives.rsa
.RSAPublicKey to use for a content encryption
:return: string encryption of an input content
"""
# TODO: add exceptions
self.logger.debug("rsa encryption")
ciphertext = recipient_pk.encrypt(
content, asym_padding.OAEP(
mgf=asym_padding.MGF1(algorithm=SHA1()),
algorithm=SHA1(),
label=None
)
)
self.logger.info("encrypted")
return ciphertext
def __verify_signature(self, signature, signer_pk, content):
""" Verify RSASSA-PSS signature
@developer: vsmysle
:param signature: signature bytes to verify
:param signer_pk: instance of cryptography.hazmat.primitives.
rsa.RSAPublicKey that is a public key of a signer
:param content: content to verify a signature of
:return: bool verification result
"""
self.logger.debug("starting signature verification routine")
try:
signer_pk.verify(
signature,
content,
asym_padding.PSS(
mgf=asym_padding.MGF1(SHA1()),
salt_length=asym_padding.PSS.MAX_LENGTH
),
SHA1()
)
except InvalidSignature:
self.logger.warn("signature verification failed")
return False
self.logger.info("signature OK")
return True
def _openssh_public_key_bytes(self, key):
if isinstance(key, rsa.RSAPublicKey):
public_numbers = key.public_numbers()
return b"ssh-rsa " + base64.b64encode(
serialization._ssh_write_string(b"ssh-rsa") +
serialization._ssh_write_mpint(public_numbers.e) +
serialization._ssh_write_mpint(public_numbers.n)
)
elif isinstance(key, dsa.DSAPublicKey):
public_numbers = key.public_numbers()
parameter_numbers = public_numbers.parameter_numbers
return b"ssh-dss " + base64.b64encode(
serialization._ssh_write_string(b"ssh-dss") +
serialization._ssh_write_mpint(parameter_numbers.p) +
serialization._ssh_write_mpint(parameter_numbers.q) +
serialization._ssh_write_mpint(parameter_numbers.g) +
serialization._ssh_write_mpint(public_numbers.y)
)
else:
assert isinstance(key, ec.EllipticCurvePublicKey)
public_numbers = key.public_numbers()
try:
curve_name = {
ec.SECP256R1: b"nistp256",
ec.SECP384R1: b"nistp384",
ec.SECP521R1: b"nistp521",
}[type(public_numbers.curve)]
except KeyError:
raise ValueError(
"Only SECP256R1, SECP384R1, and SECP521R1 curves are "
"supported by the SSH public key format"
)
return b"ecdsa-sha2-" + curve_name + b" " + base64.b64encode(
serialization._ssh_write_string(b"ecdsa-sha2-" + curve_name) +
serialization._ssh_write_string(curve_name) +
serialization._ssh_write_string(public_numbers.encode_point())
)
def _key_identifier_from_public_key(public_key):
if isinstance(public_key, RSAPublicKey):
data = public_key.public_bytes(
serialization.Encoding.DER,
serialization.PublicFormat.PKCS1,
)
elif isinstance(public_key, EllipticCurvePublicKey):
data = public_key.public_numbers().encode_point()
else:
# This is a very slow way to do this.
serialized = public_key.public_bytes(
serialization.Encoding.DER,
serialization.PublicFormat.SubjectPublicKeyInfo
)
spki, remaining = decoder.decode(
serialized, asn1Spec=_SubjectPublicKeyInfo()
)
assert not remaining
# the univ.BitString object is a tuple of bits. We need bytes and
# pyasn1 really doesn't want to give them to us. To get it we'll
# build an integer and convert that to bytes.
bits = 0
for bit in spki.getComponentByName("subjectPublicKey"):
bits = bits << 1 | bit
data = utils.int_to_bytes(bits)
return hashlib.sha1(data).digest()
def public_key(self, key):
"""
Sets the requestor's public key (as found in the signing request).
"""
if not isinstance(key, (dsa.DSAPublicKey, rsa.RSAPublicKey,
ec.EllipticCurvePublicKey)):
raise TypeError('Expecting one of DSAPublicKey, RSAPublicKey,'
' or EllipticCurvePublicKey.')
if self._public_key is not None:
raise ValueError('The public key may only be set once.')
return CertificateBuilder(
self._issuer_name, self._subject_name, key,
self._serial_number, self._not_valid_before,
self._not_valid_after, self._extensions
)
def serialize_public_key(public_key):
"""
Serializes the provided public key object as base-64-encoded DER format
using X.509 SubjectPublicKeyInfo with PKCS1.
:param public_key: the public key object
:type public_key: :class:`~rsa.RSAPublicKey`
:return: the key as base64 encoded unicode string
:rtype: str
"""
der = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
return base64.b64encode(der).decode(encoding='utf-8')
def deserialize_public_key(b64_encoded_public_key):
"""
loads a :class:`~rsa.RSAPublicKey` object from a serialized public key.
:param str b64_encoded_public_key: the key as base64 encoded string
:return: the public key object
:rtype: :class:`~rsa.RSAPublicKey`
"""
return serialization.load_der_public_key(
data=base64.b64decode(b64_encoded_public_key.encode('utf-8')),
backend=default_backend())
def test_generate_private_key():
private_key = crypto.generate_private_key()
assert isinstance(private_key, rsa.RSAPrivateKey)
public_key = private_key.public_key()
assert isinstance(public_key, rsa.RSAPublicKey)
def key2bytes():
def convert(key):
if isinstance(key, rsa.RSAPrivateKey):
return key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
elif isinstance(key, rsa.RSAPublicKey):
der = key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
return base64.b64encode(der).decode(encoding='utf-8')
return convert
def isPublic(self):
"""
Check if this instance is a public key.
@return: C{True} if this is a public key.
"""
return isinstance(
self._keyObject,
(rsa.RSAPublicKey, dsa.DSAPublicKey, ec.EllipticCurvePublicKey))
def _openssh_public_key_bytes(self, key):
if isinstance(key, rsa.RSAPublicKey):
public_numbers = key.public_numbers()
return b"ssh-rsa " + base64.b64encode(
serialization._ssh_write_string(b"ssh-rsa") +
serialization._ssh_write_mpint(public_numbers.e) +
serialization._ssh_write_mpint(public_numbers.n)
)
elif isinstance(key, dsa.DSAPublicKey):
public_numbers = key.public_numbers()
parameter_numbers = public_numbers.parameter_numbers
return b"ssh-dss " + base64.b64encode(
serialization._ssh_write_string(b"ssh-dss") +
serialization._ssh_write_mpint(parameter_numbers.p) +
serialization._ssh_write_mpint(parameter_numbers.q) +
serialization._ssh_write_mpint(parameter_numbers.g) +
serialization._ssh_write_mpint(public_numbers.y)
)
else:
assert isinstance(key, ec.EllipticCurvePublicKey)
public_numbers = key.public_numbers()
try:
curve_name = {
ec.SECP256R1: b"nistp256",
ec.SECP384R1: b"nistp384",
ec.SECP521R1: b"nistp521",
}[type(public_numbers.curve)]
except KeyError:
raise ValueError(
"Only SECP256R1, SECP384R1, and SECP521R1 curves are "
"supported by the SSH public key format"
)
return b"ecdsa-sha2-" + curve_name + b" " + base64.b64encode(
serialization._ssh_write_string(b"ecdsa-sha2-" + curve_name) +
serialization._ssh_write_string(curve_name) +
serialization._ssh_write_string(public_numbers.encode_point())
)
def calculate_max_pss_salt_length(key, hash_algorithm):
if not isinstance(key, (rsa.RSAPrivateKey, rsa.RSAPublicKey)):
raise TypeError("key must be an RSA public or private key")
# bit length - 1 per RFC 3447
emlen = int(math.ceil((key.key_size - 1) / 8.0))
salt_length = emlen - hash_algorithm.digest_size - 2
assert salt_length >= 0
return salt_length