def verify_renewable_cert_sig(renewable_cert):
""" Verifies the signature of a `.storage.RenewableCert` object.
:param `.storage.RenewableCert` renewable_cert: cert to verify
:raises errors.Error: If signature verification fails.
"""
try:
with open(renewable_cert.chain, 'rb') as chain:
chain, _ = pyopenssl_load_certificate(chain.read())
with open(renewable_cert.cert, 'rb') as cert:
cert = x509.load_pem_x509_certificate(cert.read(), default_backend())
hash_name = cert.signature_hash_algorithm.name
OpenSSL.crypto.verify(chain, cert.signature, cert.tbs_certificate_bytes, hash_name)
except (IOError, ValueError, OpenSSL.crypto.Error) as e:
error_str = "verifying the signature of the cert located at {0} has failed. \
Details: {1}".format(renewable_cert.cert, e)
logger.exception(error_str)
raise errors.Error(error_str)
python类load_pem_x509_certificate()的实例源码
def load_certificate(self, cert_name):
# Load the raw certificate file data.
path = os.path.join(self.cert_path, cert_name)
with open(path, 'rb') as cert_file:
data = cert_file.read()
# Convert the raw certificate data into a certificate object, first
# as a PEM-encoded certificate and, if that fails, then as a
# DER-encoded certificate. If both fail, the certificate cannot be
# loaded.
try:
return x509.load_pem_x509_certificate(data, default_backend())
except Exception:
try:
return x509.load_der_x509_certificate(data, default_backend())
except Exception:
raise exception.SignatureVerificationError(
"Failed to load certificate: %s" % path
)
def validate_ca_cert(self, ignored):
expected = self._get_expected_ca_cert_fingerprint()
algo, expectedfp = expected.split(':')
expectedfp = expectedfp.replace(' ', '')
backend = default_backend()
with open(self._get_ca_cert_path(), 'r') as f:
certstr = f.read()
cert = load_pem_x509_certificate(certstr, backend)
hasher = getattr(hashes, algo)()
fpbytes = cert.fingerprint(hasher)
fp = binascii.hexlify(fpbytes)
if fp != expectedfp:
os.unlink(self._get_ca_cert_path())
self.log.error("Fingerprint of CA cert doesn't match: %s <-> %s"
% (fp, expectedfp))
raise NetworkError("The provider's CA fingerprint doesn't match")
def read_file(self, subject_name, pw=None):
"""
???? ???
:param subject_name: ??? ??? ??? ??
:return: ??? ???
"""
pem_path = os.path.join(os.path.dirname(__file__),
"../../resources/unittest/" + subject_name + "/cert.pem")
f = open(pem_path, "rb")
cert_pem = f.read()
f.close()
cert = x509.load_pem_x509_certificate(cert_pem, default_backend())
key_path = os.path.join(os.path.dirname(__file__),
"../../resources/unittest/" + subject_name + "/key.pem")
f = open(key_path, "rb")
cert_key = f.read()
f.close()
private_key = serialization.load_pem_private_key(cert_key, pw, default_backend())
return {'cert': cert, 'private_key': private_key}
def load_pki(self, cert_path: str, cert_pass=None):
"""
??? ??
:param cert_path: ??? ??
:param cert_pass: ??? ????
"""
ca_cert_file = join(cert_path, self.CERT_NAME)
ca_pri_file = join(cert_path, self.PRI_NAME)
# ???/??? ??
with open(ca_cert_file, "rb") as der:
cert_bytes = der.read()
self.__ca_cert = x509.load_pem_x509_certificate(cert_bytes, default_backend())
with open(ca_pri_file, "rb") as der:
private_bytes = der.read()
try:
self.__ca_pri = serialization.load_pem_private_key(private_bytes, cert_pass, default_backend())
except ValueError:
logging.debug("Invalid Password")
# ??? ? ? ??
sign = self.sign_data(b'TEST')
if self.verify_data(b'TEST', sign) is False:
logging.debug("Invalid Signature(Root Certificate load test)")
def validate_ca_certificate_constraints(cert_pem_data):
cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
try:
constraints = cert.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS)
constraints = constraints.value
except x509.extensions.ExtensionNotFound:
return
if not constraints.ca:
raise InvalidCertificate("Not a CA certificate")
if constraints.path_length != 0:
raise InvalidCertificate("Invalid pathlen")
# based on ssl.match_hostname code
# https://github.com/python/cpython/blob/6f0eb93183519024cb360162bdd81b9faec97ba6/Lib/ssl.py#L279
def load_certificate(cert_file=None, cert_bytes=None):
if cert_file:
with open(cert_file, 'rb') as f:
data = f.read()
else:
data = cert_bytes
if '-----BEGIN'.encode() in data:
cert = x509.load_pem_x509_certificate(
data=data,
backend=default_backend()
)
else:
cert = x509.load_der_x509_certificate(
data=data,
backend=default_backend()
)
return cert
def _user_cert_validation(cert_str):
"""Prompt user for validation of certification from cluster
:param cert_str: cluster certificate bundle
:type cert_str: str
:returns whether or not user validated cert
:rtype: bool
"""
cert = x509.load_pem_x509_certificate(
cert_str.encode('utf-8'), default_backend())
fingerprint = cert.fingerprint(hashes.SHA256())
pp_fingerprint = ":".join("{:02x}".format(c) for c in fingerprint).upper()
msg = "SHA256 fingerprint of cluster certificate bundle:\n{}".format(
pp_fingerprint)
return confirm(msg, False)
def get_certificate(self, kid):
# retrieve keys from jwks_url
resp = self.request(self.jwks_url(), method='GET')
resp.raise_for_status()
# find the proper key for the kid
for key in resp.json()['keys']:
if key['kid'] == kid:
x5c = key['x5c'][0]
break
else:
raise DecodeError('Cannot find kid={}'.format(kid))
certificate = '-----BEGIN CERTIFICATE-----\n' \
'{}\n' \
'-----END CERTIFICATE-----'.format(x5c)
return load_pem_x509_certificate(certificate.encode(),
default_backend())
def test_crl_0(self):
"""Test that the X509 CRL revocation works correctly."""
with open(os.path.join(self.crl_dir, "ch1_ta4_crl.pem"),
"rb") as f:
crl = x509.load_pem_x509_crl(
f.read(), default_backend())
with open(os.path.join(self.cs_dir,
"cs1_ch1_ta4_cert.pem"), "rb") as f:
cert = x509.load_pem_x509_certificate(
f.read(), default_backend())
self.assertTrue(crl.issuer == cert.issuer)
for rev in crl:
if rev.serial_number == cert.serial_number:
break
else:
self.assertTrue(False, "Can not find revoked "
"certificate in CRL!")
def requireRenewal(self, path, lifetime):
with open(path, 'rb') as fh:
crt = fh.read()
data = x509.load_pem_x509_certificate(crt, default_backend())
date = data.not_valid_after - datetime.timedelta(days = lifetime)
if datetime.datetime.now() <= date:
return False
else:
return True
def certificate_expiry_date(self):
if self.certificate:
cert = x509.load_pem_x509_certificate(str(self.certificate), default_backend())
return cert.not_valid_after
def certificate_common_name(self):
if self.certificate:
cert = x509.load_pem_x509_certificate(str(self.certificate), default_backend())
return cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
def load_pem_certificate(data):
"""
Loads a PEM X.509 certificate.
"""
return x509.load_pem_x509_certificate(data, default_backend())
def test_convert_from_cryptography(self):
crypto_cert = x509.load_pem_x509_certificate(
intermediate_cert_pem, backend
)
cert = X509.from_cryptography(crypto_cert)
assert isinstance(cert, X509)
assert cert.get_version() == crypto_cert.version.value
def decode_token(token):
"""
Get the organisation ID from a token
:param token: a JSON Web Token
:returns: str, organisation ID
:raises:
jwt.DecodeError: Invalid token or not signed with our key
jwt.ExpiredSignatureError: Token has expired
jwt.InvalidAudienceError: Invalid "aud" claim
jwt.InvalidIssuerError: Invalid "iss" claim
jwt.MissingRequiredClaimError: Missing a required claim
"""
cert_file = getattr(options, 'ssl_cert', None) or LOCALHOST_CRT
with open(cert_file) as f:
cert = load_pem_x509_certificate(f.read(), default_backend())
public_key = cert.public_key()
payload = jwt.decode(token,
public_key,
audience=audience(),
issuer=issuer(),
algorithms=[ALGORITHM],
verify=True)
if not payload.get('sub'):
raise jwt.MissingRequiredClaimError('"sub" claim is required')
payload['scope'] = Scope(payload['scope'])
return payload
def parse_issuer_cred(issuer_cred):
"""
Given an X509 PEM file in the form of a string, parses it into sections
by the PEM delimiters of: -----BEGIN <label>----- and -----END <label>----
Confirms the sections can be decoded in the proxy credential order of:
issuer cert, issuer private key, proxy chain of 0 or more certs .
Returns the issuer cert and private key as loaded cryptography objects
and the proxy chain as a potentially empty string.
"""
# get each section of the PEM file
sections = re.findall(
"-----BEGIN.*?-----.*?-----END.*?-----", issuer_cred, flags=re.DOTALL)
try:
issuer_cert = sections[0]
issuer_private_key = sections[1]
issuer_chain_certs = sections[2:]
except IndexError:
raise ValueError("Unable to parse PEM data in credentials, "
"make sure the X.509 file is in PEM format and "
"consists of the issuer cert, issuer private key, "
"and proxy chain (if any) in that order.")
# then validate that each section of data can be decoded as expected
try:
loaded_cert = x509.load_pem_x509_certificate(
six.b(issuer_cert), default_backend())
loaded_private_key = serialization.load_pem_private_key(
six.b(issuer_private_key),
password=None, backend=default_backend())
for chain_cert in issuer_chain_certs:
x509.load_pem_x509_certificate(
six.b(chain_cert), default_backend())
issuer_chain = "".join(issuer_chain_certs)
except ValueError:
raise ValueError("Failed to decode PEM data in credentials. Make sure "
"the X.509 file consists of the issuer cert, "
"issuer private key, and proxy chain (if any) "
"in that order.")
# return loaded cryptography objects and the issuer chain
return loaded_cert, loaded_private_key, issuer_chain
def __load_cert(self, cert_dir):
"""???/??? ?? ? ?? ???
:param cert_dir: ???/??? ?? ??
:return: X509 ???
"""
logging.debug("Cert/Key loading...")
cert_file = join(cert_dir, "cert.pem")
pri_file = join(cert_dir, "key.pem")
f = open(cert_file, "rb")
cert_bytes = f.read()
f.close()
cert = x509.load_pem_x509_certificate(cert_bytes, default_backend())
f = open(pri_file, "rb")
pri_bytes = f.read()
f.close()
try:
pri = serialization.load_pem_private_key(pri_bytes, self.__PASSWD, default_backend())
except ValueError:
logging.debug("Invalid Password(%s)", cert_dir)
return None
data = b"test"
signature = pri.sign(data, ec.ECDSA(hashes.SHA256()))
try:
pub_key = cert.public_key()
result = pub_key.verify(signature, data, ec.ECDSA(hashes.SHA256()))
except InvalidSignature:
logging.debug("sign test fail")
result = False
if result:
return cert
else:
logging.error("result is False ")
return None
def convert_x509cert_from_pem(self, cert_pem):
"""PEM ???? x509 ???? ??
:param cert_pem: PEM ???
:return: x509 ???
"""
return x509.load_pem_x509_certificate(cert_pem, default_backend())
def validate_certificate_common_name(cert_pem_data, subject_name):
cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
_match_subject_name(cert, subject_name, alt_names=False)
def validate_certificate_hosts(cert_pem_data, host_names):
cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
for host_name in host_names:
_match_subject_name(cert, host_name, compare_func=ssl._dnsname_match)
def validate_certificate_host_ips(cert_pem_data, host_ips):
cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
for host_ip in host_ips:
_match_subject_ip(cert, host_ip)
def validate_certificate_key_usage(cert_pem_data, is_web_server, is_web_client):
cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
try:
key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE)
key_usage = key_usage.value
except x509.extensions.ExtensionNotFound:
raise InvalidCertificate("Key usage not specified")
if not key_usage.digital_signature:
raise InvalidCertificate("Not intented for Digital Signature")
if not key_usage.key_encipherment:
raise InvalidCertificate("Not intented for Key Encipherment")
if is_web_server or is_web_client:
try:
exteneded_key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.EXTENDED_KEY_USAGE)
exteneded_key_usage = exteneded_key_usage.value
except x509.extensions.ExtensionNotFound:
raise InvalidCertificate("Extended key usage not specified")
if is_web_server:
if ExtendedKeyUsageOID.SERVER_AUTH not in exteneded_key_usage:
raise InvalidCertificate("Not intented for TLS Web Server Authentication")
if is_web_client:
if ExtendedKeyUsageOID.CLIENT_AUTH not in exteneded_key_usage:
raise InvalidCertificate("Not intented for TLS Web Client Authentication")
def list_hosts(self):
hosts = {}
for csr_file in os.listdir(self.csr_path):
with open(os.path.join(self.csr_path, csr_file), 'rb') as f:
csr = x509.load_pem_x509_csr(f.read(), default_backend())
hosts[csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value] = {
'key_fingerprint': rsa_key_fingerprint(csr.public_key()),
'cert_fingerprint': None,
'status': 'pending',
}
for crt_file in os.listdir(self.crt_path):
with open(os.path.join(self.crt_path, crt_file), 'rb') as f:
crt = x509.load_pem_x509_certificate(f.read(), default_backend())
revoked = revoked_cert(crt, self.crl)
if revoked:
status = 'revoked'
else:
status = 'authorized'
hosts[crt.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value] = {
'key_fingerprint': rsa_key_fingerprint(crt.public_key()),
'cert_fingerprint': x509_cert_fingerprint(crt),
'status': status,
}
return hosts
def loadCert(pem):
"""
Creates a cryptography certificate object from the given PEM certificate.
:param pem: A certificate as a PEM string.
:return: A cryptography certificate object.
"""
return x509.load_pem_x509_certificate(pem.encode("utf-8"), default_backend())
def __init__(self, local_cert, priv_key, ca_cert, controller_name_re):
"""Initialize JWTUtils
Load the local node certificate, the node private
key and the CA certificate from files; prepare for both
signing and validation of key-value pairs.
Signing will take place with the local certificate, and the
public half will be added to signed objects.
Validation will take place with the CA certificate, along with
other checks that the signing matches the payload.
:param local_cert: file containing public half of the local key
:param priv_key: file containing private half of the local key
:param ca_cert: file containing CA root certificate
raise: IOError if the files cannot be read.
"""
priv_key_pem = self._get_crypto_material(priv_key)
self.private_key = serialization.load_pem_private_key(
priv_key_pem,
password=None,
backend=default_backend())
self.node_certificate = self._get_crypto_material(local_cert)
self.node_cert_obj = load_pem_x509_certificate(
self.node_certificate,
default_backend())
self.node_cert_pem = self.node_cert_obj.public_bytes(
serialization.Encoding.PEM)
ca_certificate = self._get_crypto_material(ca_cert)
# pyopenssl
root_ca = crypto.load_certificate(crypto.FILETYPE_PEM,
ca_certificate)
self.store = crypto.X509Store()
self.store.add_cert(root_ca)
self.controller_name_re = controller_name_re
def generate_x509_fingerprint(pem_key):
try:
if isinstance(pem_key, six.text_type):
pem_key = pem_key.encode('utf-8')
cert = x509.load_pem_x509_certificate(
pem_key, backends.default_backend())
raw_fp = binascii.hexlify(cert.fingerprint(hashes.SHA1()))
if six.PY3:
raw_fp = raw_fp.decode('ascii')
return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
except (ValueError, TypeError, binascii.Error) as ex:
raise exception.InvalidKeypair(
reason=_('failed to generate X509 fingerprint. '
'Error message: %s') % ex)
def get_google_public_cert_key():
r = requests.get(GOOGLE_PUBLIC_CERT_URL)
r.raise_for_status()
# Load the certificate.
certificate = x509.load_pem_x509_certificate(
r.text.encode('utf-8'), default_backend())
# Get the certicate's public key.
public_key = certificate.public_key()
return public_key
def deserialize(self, data):
cert = x509.load_pem_x509_certificate(data, default_backend())
key_material = cert.public_key().public_numbers()
self._e = key_material.e
self._n = key_material.n
def _verify(self):
'''
Check, if message is wellsigned
'''
try:
# canonize soap body a make sha256 digest
body_c14n = etree.tostring(self.body, method='c14n', exclusive=True, with_comments=False)
sha256 = hashlib.sha256(body_c14n)
digest = b64encode(sha256.digest())
# load cert options
cert = self.root.find('.//wsse:BinarySecurityToken', namespaces=NSMAP)
sig_info = self.root.find('.//ds:SignedInfo', namespaces=NSMAP)
sig_value = self.root.find('.//ds:SignatureValue', namespaces=NSMAP)
# check, if there is all nesesery data
assert cert is not None
assert sig_info is not None
assert sig_value is not None
# canonize signature info
sig_info_c14n = etree.tostring(sig_info, method='c14n', exclusive=True, with_comments=False)
# transform and load cert
cert = '\n'.join(['-----BEGIN CERTIFICATE-----'] + textwrap.wrap(cert.text, 64) + ['-----END CERTIFICATE-----\n'])
cert = load_pem_x509_certificate(cert.encode('utf-8'), default_backend())
key = cert.public_key()
# verify digest
verifier = key.verifier(b64decode(sig_value.text), padding.PKCS1v15(), hashes.SHA256())
verifier.update(sig_info_c14n)
# if verify fail, raise exception
verifier.verify()
return True
except Exception as e:
logger.exception(e)
# probably error, return false
return False