def verify_cert_chain(chain_pem, trusted_certs):
cert = crypto.load_certificate(crypto.FILETYPE_PEM, chain_pem.decode('utf-8'))
# Build store of trusted certificates
store = crypto.X509Store()
for _cert in trusted_certs:
tmp = crypto.load_certificate(crypto.FILETYPE_PEM, _cert.decode('utf-8'))
store.add_cert(tmp)
# Prepare context
ctx = crypto.X509StoreContext(store, cert)
# Start validation
try:
ctx.verify_certificate()
return True
except crypto.X509StoreContextError as e:
logging.error("Certificate validation failed: %s" % e)
return False
python类X509Store()的实例源码
def _verify_ca(self):
"""
(internal use only)
verifies the current x509 is signed
by the associated CA
"""
store = crypto.X509Store()
store.add_cert(self.ca.x509)
store_ctx = crypto.X509StoreContext(store, self.x509)
try:
store_ctx.verify_certificate()
except crypto.X509StoreContextError as e:
raise ValidationError(_("CA doesn't match, got the "
"following error from pyOpenSSL: \"%s\"") % e.args[0][2])
def registerOrdererAdminTuple(self, userName, ordererName, organizationName):
' Assign the user as orderer admin'
ordererAdminTuple = NodeAdminTuple(user=userName, nodeName=ordererName, organization=organizationName)
assert ordererAdminTuple not in self.ordererAdminTuples, "Orderer admin tuple already registered {0}".format(
ordererAdminTuple)
assert organizationName in self.organizations, "Orderer Organization not defined {0}".format(organizationName)
user = self.getUser(userName, shouldCreate=True)
# Add the subjectAlternativeName if the current entity is a signer, and the nodeName contains peer or orderer
extensions = self._get_cert_extensions_ip_sans(userName, ordererName)
certReq = user.createCertRequest(ordererAdminTuple.nodeName, extensions=extensions)
userCert = self.getOrganization(organizationName).createCertificate(certReq, extensions=extensions)
# Verify the newly created certificate
store = crypto.X509Store()
# Assuming a list of trusted certs
for trustedCert in [self.getOrganization(organizationName).signedCert]:
store.add_cert(trustedCert)
# Create a certificate context using the store and the certificate to verify
store_ctx = crypto.X509StoreContext(store, userCert)
# Verify the certificate, returns None if it can validate the certificate
store_ctx.verify_certificate()
self.ordererAdminTuples[ordererAdminTuple] = userCert
return ordererAdminTuple
def test_get_cert_store(self):
"""
`Context.get_cert_store` returns a `X509Store` instance.
"""
context = Context(TLSv1_METHOD)
store = context.get_cert_store()
assert isinstance(store, X509Store)
def test_new(self):
cert = self._create_cert()
self.assertNotEqual(cert.certificate, '')
self.assertNotEqual(cert.private_key, '')
x509 = cert.x509
self.assertEqual(x509.get_serial_number(), cert.serial_number)
subject = x509.get_subject()
# check subject
self.assertEqual(subject.countryName, cert.country_code)
self.assertEqual(subject.stateOrProvinceName, cert.state)
self.assertEqual(subject.localityName, cert.city)
self.assertEqual(subject.organizationName, cert.organization_name)
self.assertEqual(subject.emailAddress, cert.email)
self.assertEqual(subject.commonName, cert.common_name)
# check issuer
issuer = x509.get_issuer()
ca = cert.ca
self.assertEqual(issuer.countryName, ca.country_code)
self.assertEqual(issuer.stateOrProvinceName, ca.state)
self.assertEqual(issuer.localityName, ca.city)
self.assertEqual(issuer.organizationName, ca.organization_name)
self.assertEqual(issuer.emailAddress, ca.email)
self.assertEqual(issuer.commonName, ca.common_name)
# check signature
store = crypto.X509Store()
store.add_cert(ca.x509)
store_ctx = crypto.X509StoreContext(store, cert.x509)
store_ctx.verify_certificate()
# ensure version is 3 (indexed 0 based counting)
self.assertEqual(x509.get_version(), 2)
# basic constraints
e = cert.x509.get_extension(0)
self.assertEqual(e.get_critical(), 0)
self.assertEqual(e.get_short_name().decode(), 'basicConstraints')
self.assertEqual(e.get_data(), b'0\x00')
def verify_certificate_chain(ca_pem_data, cert_pem_data):
try:
ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, ca_pem_data)
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem_data)
store = crypto.X509Store()
store.add_cert(ca_cert)
store_ctx = crypto.X509StoreContext(store, cert)
store_ctx.verify_certificate()
except crypto.Error as e:
raise InvalidCertificate('Broken certificate') from e
except crypto.X509StoreContextError as e:
raise InvalidCertificate('Invalid certificate chain: ' + str(e)) from e
def test_get_cert_store(self):
"""
:py:obj:`Context.get_cert_store` returns a :py:obj:`X509Store` instance.
"""
context = Context(TLSv1_METHOD)
store = context.get_cert_store()
self.assertIsInstance(store, X509Store)
def __init__(self, local_cert, priv_key, ca_cert, controller_name_re):
"""Initialize JWTUtils
Load the local node certificate, the node private
key and the CA certificate from files; prepare for both
signing and validation of key-value pairs.
Signing will take place with the local certificate, and the
public half will be added to signed objects.
Validation will take place with the CA certificate, along with
other checks that the signing matches the payload.
:param local_cert: file containing public half of the local key
:param priv_key: file containing private half of the local key
:param ca_cert: file containing CA root certificate
raise: IOError if the files cannot be read.
"""
priv_key_pem = self._get_crypto_material(priv_key)
self.private_key = serialization.load_pem_private_key(
priv_key_pem,
password=None,
backend=default_backend())
self.node_certificate = self._get_crypto_material(local_cert)
self.node_cert_obj = load_pem_x509_certificate(
self.node_certificate,
default_backend())
self.node_cert_pem = self.node_cert_obj.public_bytes(
serialization.Encoding.PEM)
ca_certificate = self._get_crypto_material(ca_cert)
# pyopenssl
root_ca = crypto.load_certificate(crypto.FILETYPE_PEM,
ca_certificate)
self.store = crypto.X509Store()
self.store.add_cert(root_ca)
self.controller_name_re = controller_name_re
def test_get_cert_store(self):
"""
:py:obj:`Context.get_cert_store` returns a :py:obj:`X509Store` instance.
"""
context = Context(TLSv1_METHOD)
store = context.get_cert_store()
self.assertIsInstance(store, X509Store)
def verify_certificate(ca, cert):
store = crypto.X509Store()
store.add_cert(ca)
try:
crypto.X509StoreContext(store, cert).verify_certificate()
except:
return False
return True