def test_invalid_issuer(self):
"""
If the C{issuer} parameter is given a value which is not an L{X509}
instance, L{TypeError} is raised.
"""
for badObj in [True, object(), "hello", [], self]:
self.assertRaises(
TypeError,
X509Extension,
'authorityKeyIdentifier', False, 'keyid:always,issuer:always',
issuer=badObj)
python类X509Extension()的实例源码
def _get_cert_extensions_ip_sans(self, user_name, node_name):
extensions = []
if 'signer' in user_name.lower():
if ('peer' in node_name.lower() or 'orderer' in node_name.lower()):
san_list = ["DNS:{0}".format(node_name)]
extensions.append(crypto.X509Extension(b"subjectAltName", False, ", ".join(san_list)))
return extensions
def __init__(self):
# CA key
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
# CA cert
cert = crypto.X509()
cert.set_serial_number(self.__next_serial)
cert.set_version(2)
cert.set_pubkey(key)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(10*365*24*60*60)
cacert_subject = cert.get_subject()
cacert_subject.O = 'kOVHernetes'
cacert_subject.OU = 'kOVHernetes Certificate Authority'
cacert_subject.CN = 'kOVHernetes Root CA'
cert.set_issuer(cacert_subject)
cert.add_extensions((crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', cert),))
cacert_ext = []
cacert_ext.append(crypto.X509Extension(b'authorityKeyIdentifier', True, b'keyid:always,issuer', issuer=cert))
cacert_ext.append(crypto.X509Extension(b'basicConstraints', True, b'CA:TRUE'))
cacert_ext.append(crypto.X509Extension(b'keyUsage', True, b'digitalSignature, cRLSign, keyCertSign'))
cert.add_extensions(cacert_ext)
# sign CA cert with CA key
cert.sign(key, 'sha256')
type(self).__next_serial += 1
self.cert = cert
self.key = key
def create_client_cert(self, key, o, cn):
"""Issue a X.509 client certificate"""
cert = crypto.X509()
cert.set_serial_number(self.__next_serial)
cert.set_version(2)
cert.set_pubkey(key)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(365*24*60*60)
cert_subject = cert.get_subject()
cert_subject.O = o
cert_subject.OU = 'kOVHernetes'
cert_subject.CN = cn
cert.set_issuer(self.cert.get_issuer())
cert_ext = []
cert_ext.append(crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', cert))
cert_ext.append(crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid,issuer', issuer=cert))
cert_ext.append(crypto.X509Extension(b'basicConstraints', False, b'CA:FALSE'))
cert_ext.append(crypto.X509Extension(b'keyUsage', True, b'nonRepudiation, digitalSignature, keyEncipherment'))
cert_ext.append(crypto.X509Extension(b'extendedKeyUsage', True, b'clientAuth'))
cert.add_extensions(cert_ext)
# sign cert with CA key
cert.sign(self.key, 'sha256')
type(self).__next_serial += 1
return cert
def create_server_cert(self, key, o, cn, san=[]):
"""Issue a X.509 server certificate"""
cert = crypto.X509()
cert.set_serial_number(self.__next_serial)
cert.set_version(2)
cert.set_pubkey(key)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(365*24*60*60)
cert_subject = cert.get_subject()
cert_subject.O = o
cert_subject.OU = 'kOVHernetes'
cert_subject.CN = cn
cert.set_issuer(self.cert.get_issuer())
cert_ext = []
cert_ext.append(crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', cert))
cert_ext.append(crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid,issuer:always', issuer=cert))
cert_ext.append(crypto.X509Extension(b'basicConstraints', False, b'CA:FALSE'))
cert_ext.append(crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment'))
cert_ext.append(crypto.X509Extension(b'extendedKeyUsage', True, b'serverAuth'))
if san:
cert_ext.append(crypto.X509Extension(b'subjectAltName', False, ','.join(san).encode()))
cert.add_extensions(cert_ext)
# sign cert with CA key
cert.sign(self.key, 'sha256')
type(self).__next_serial += 1
return cert
def create_client_pair(self, o, cn):
"""Issue a X.509 client key/certificate pair"""
# key
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
# cert
cert = crypto.X509()
cert.set_serial_number(self.__next_serial)
cert.set_version(2)
cert.set_pubkey(key)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(365*24*60*60)
cert_subject = cert.get_subject()
cert_subject.O = o
cert_subject.OU = 'kOVHernetes'
cert_subject.CN = cn
cert.set_issuer(self.cert.get_issuer())
cert_ext = []
cert_ext.append(crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', cert))
cert_ext.append(crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid,issuer', issuer=cert))
cert_ext.append(crypto.X509Extension(b'basicConstraints', False, b'CA:FALSE'))
cert_ext.append(crypto.X509Extension(b'keyUsage', True, b'nonRepudiation, digitalSignature, keyEncipherment'))
cert_ext.append(crypto.X509Extension(b'extendedKeyUsage', True, b'clientAuth'))
cert.add_extensions(cert_ext)
# sign cert with CA key
cert.sign(self.key, 'sha256')
type(self).__next_serial += 1
return key, cert
def create_server_pair(self, o, cn, san=[]):
"""Issue a X.509 server key/certificate pair"""
# key
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
# cert
cert = crypto.X509()
cert.set_serial_number(self.__next_serial)
cert.set_version(2)
cert.set_pubkey(key)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(365*24*60*60)
cert_subject = cert.get_subject()
cert_subject.O = o
cert_subject.OU = 'kOVHernetes'
cert_subject.CN = cn
cert.set_issuer(self.cert.get_issuer())
cert_ext = []
cert_ext.append(crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', cert))
cert_ext.append(crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid,issuer:always', issuer=cert))
cert_ext.append(crypto.X509Extension(b'basicConstraints', False, b'CA:FALSE'))
cert_ext.append(crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment'))
cert_ext.append(crypto.X509Extension(b'extendedKeyUsage', True, b'serverAuth'))
if san: cert_ext.append(crypto.X509Extension(b'subjectAltName', False, ','.join(san).encode()))
cert.add_extensions(cert_ext)
# sign cert with CA key
cert.sign(self.key, 'sha256')
type(self).__next_serial += 1
return key, cert
def generate(self, module):
'''Generate the certificate signing request.'''
if not os.path.exists(self.path) or self.force:
req = crypto.X509Req()
req.set_version(self.version)
subject = req.get_subject()
for (key, value) in self.subject.items():
if value is not None:
setattr(subject, key, value)
if self.subjectAltName is not None:
req.add_extensions([crypto.X509Extension(b"subjectAltName", False, self.subjectAltName.encode('ascii'))])
privatekey_content = open(self.privatekey_path).read()
self.privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, privatekey_content)
req.set_pubkey(self.privatekey)
req.sign(self.privatekey, self.digest)
self.request = req
try:
csr_file = open(self.path, 'wb')
csr_file.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request))
csr_file.close()
except (IOError, OSError) as exc:
raise CertificateSigningRequestError(exc)
else:
self.changed = False
file_args = module.load_file_common_arguments(module.params)
if module.set_fs_attributes_if_different(file_args, False):
self.changed = True
def _create_test_cert(cert_file, key_file, subject, valid_days, serial_number):
# create a key pair
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, 2046)
# create a self-signed cert with some basic constraints
cert = crypto.X509()
cert.get_subject().CN = subject
cert.gmtime_adj_notBefore(-1 * 24 * 60 * 60)
cert.gmtime_adj_notAfter(valid_days * 24 * 60 * 60)
cert.set_version(2)
cert.set_serial_number(serial_number)
cert.add_extensions([
crypto.X509Extension(b"basicConstraints", True, b"CA:TRUE, pathlen:1"),
crypto.X509Extension(b"subjectKeyIdentifier", False, b"hash",
subject=cert),
])
cert.add_extensions([
crypto.X509Extension(b"authorityKeyIdentifier", False, b"keyid:always",
issuer=cert)
])
cert.set_issuer(cert.get_subject())
cert.set_pubkey(k)
cert.sign(k, 'sha256')
cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('ascii')
key_str = crypto.dump_privatekey(crypto.FILETYPE_PEM, k).decode('ascii')
open(cert_file, 'w').write(cert_str)
open(key_file, 'w').write(key_str)
def _create_verification_cert(cert_file, key_file, verification_file, nonce, valid_days, serial_number):
if exists(cert_file) and exists(key_file):
# create a key pair
public_key = crypto.PKey()
public_key.generate_key(crypto.TYPE_RSA, 2046)
# open the root cert and key
signing_cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_file).read())
k = crypto.load_privatekey(crypto.FILETYPE_PEM, open(key_file).read())
# create a cert signed by the root
verification_cert = crypto.X509()
verification_cert.get_subject().CN = nonce
verification_cert.gmtime_adj_notBefore(0)
verification_cert.gmtime_adj_notAfter(valid_days * 24 * 60 * 60)
verification_cert.set_version(2)
verification_cert.set_serial_number(serial_number)
verification_cert.set_pubkey(public_key)
verification_cert.set_issuer(signing_cert.get_subject())
verification_cert.add_extensions([
crypto.X509Extension(b"authorityKeyIdentifier", False, b"keyid:always",
issuer=signing_cert)
])
verification_cert.sign(k, 'sha256')
verification_cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, verification_cert).decode('ascii')
open(verification_file, 'w').write(verification_cert_str)
def _generate_csr(key, key_digest, domains):
csr = crypto.X509Req()
csr.set_version(2)
csr.set_pubkey(key)
sans = ', '.join('DNS:{}'.format(d) for d in domains)
exts = [crypto.X509Extension(b'subjectAltName', False, b(sans))]
csr.add_extensions(exts)
csr.sign(key, str(key_digest))
return ComparableX509(csr)
def generate_ca(config_ca):
ca = crypto.X509()
ca.set_version(2)
ca.set_serial_number(config_ca['serial'])
ca_subj = ca.get_subject()
if 'commonName' in config_ca:
ca_subj.commonName = config_ca['commonName']
if 'stateOrProvinceName' in config_ca:
ca_subj.stateOrProvinceName = config_ca['stateOrProvinceName']
if 'localityName' in config_ca:
ca_subj.localityName = config_ca['localityName']
if 'organizationName' in config_ca:
ca_subj.organizationName = config_ca['organizationName']
if 'organizationalUnitName' in config_ca:
ca_subj.organizationalUnitName = config_ca['organizationalUnitName']
if 'emailAddress' in config_ca:
ca_subj.emailAddress = config_ca['emailAddress']
if 'countryName' in config_ca:
ca_subj.countryName = config_ca['countryName']
if 'validfrom' in config_ca:
ca.set_notBefore(config_ca['validfrom'])
if 'validto' in config_ca:
ca.set_notAfter(config_ca['validto'])
key = openssl_generate_privatekey(config_ca['keyfilesize'])
ca.add_extensions([
crypto.X509Extension("basicConstraints", True, "CA:TRUE, pathlen:1"),
crypto.X509Extension("keyUsage", False, "keyCertSign, cRLSign"),
crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=ca),
])
ca.add_extensions([
crypto.X509Extension("authorityKeyIdentifier", False, "keyid:always",issuer=ca)
])
ca.set_issuer(ca.get_subject())
ca.set_pubkey(key)
ca.sign(key, config_ca['hashalgorithm'])
return ca, key
def generate_certificate(config_cert, ca, cakey):
# Generate the private key
key = openssl_generate_privatekey(config_cert['keyfilesize'])
# Generate the certificate request
req = crypto.X509Req()
req_subj = req.get_subject()
if 'commonName' in config_cert:
req_subj.commonName = config_cert['commonName']
if 'stateOrProvinceName' in config_cert:
req_subj.stateOrProvinceName = config_cert['stateOrProvinceName']
if 'localityName' in config_cert:
req_subj.localityName = config_cert['localityName']
if 'organizationName' in config_cert:
req_subj.organizationName = config_cert['organizationName']
if 'organizationalUnitName' in config_cert:
req_subj.organizationalUnitName = config_cert['organizationalUnitName']
if 'emailAddress' in config_cert:
req_subj.emailAddress = config_cert['emailAddress']
if 'countryName' in config_cert:
req_subj.countryName = config_cert['countryName']
req.set_pubkey(key)
req.sign(key, config_cert['hashalgorithm'])
# Now generate the certificate itself
cert = crypto.X509()
cert.set_version(2)
cert.set_serial_number(config_cert['serial'])
cert.set_subject(req.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.set_issuer(ca.get_subject())
if 'validfrom' in config_cert:
cert.set_notBefore(config_cert['validfrom'])
if 'validto' in config_cert:
cert.set_notAfter(config_cert['validto'])
cert.add_extensions([
crypto.X509Extension("basicConstraints", True, "CA:FALSE"),
crypto.X509Extension("keyUsage", False, "digitalSignature"),
crypto.X509Extension("extendedKeyUsage", False, "codeSigning,msCTLSign,timeStamping,msCodeInd,msCodeCom"),
crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=cert),
crypto.X509Extension("authorityKeyIdentifier", False, "keyid:always", issuer=ca)
])
cert.sign(cakey, config_cert['hashalgorithm'])
return req, cert, key
def create_subcert(certfile, commonname, ip=False, sans=None):
sans = set(sans) if sans else set()
cert = crypto.X509()
cert.set_version(2)
cert.set_serial_number(int((int(time() - sub_serial) + random.random()) * 100)) #setting the only number
subject = cert.get_subject()
subject.countryName = 'CN'
subject.stateOrProvinceName = 'Internet'
subject.localityName = 'Cernet'
subject.organizationalUnitName = '%s Branch' % ca_vendor
subject.commonName = commonname
subject.organizationName = commonname
#????????????????????
cert.gmtime_adj_notBefore(sub_time_b)
cert.gmtime_adj_notAfter(sub_time_a)
cert.set_issuer(ca_subject)
cert.set_pubkey(sub_publickey)
sans.add(commonname)
if not ip:
sans.add('*.' + commonname)
sans = ', '.join('DNS: %s' % x for x in sans)
cert.add_extensions([crypto.X509Extension(b'subjectAltName', True, sans.encode())])
cert.sign(ca_privatekey, ca_digest)
with open(certfile, 'wb') as fp:
fp.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
def create_certificate(self, request, issuer_cert, issuer_key, serial, days=3650, digest='sha256', extensions=[], subject_alt_names='', version=2):
"""
Generate a certificate given the certificate request.
Arguments: request - Certificate request to sign
issuer_cert - The certificate of the issuer
issuer_key - The private key of the issuer
extensions - x509 extensions provided as a dictionary :name, :critical, :value
subject_alt_names - subject alt names e.g. IP:192.168.7.1 or DNS:my.domain
serial - The serial number to assign to the certificate
days - The number of days of validity (starting from now)
digest - The digest method for signing (by default sha256)
"""
certificate = crypto.X509()
# Handle x509 extensions
for extension in extensions:
# handle issuer and subjects that need to be self-referential (root certificate)
if 'subject' in extension.keys() and extension['subject'] == 'self':
extension['subject'] = certificate
if 'issuer' in extension.keys() and extension['issuer'] == 'self':
extension['issuer'] = certificate
elif 'issuer' in extension.keys() and extension['issuer'] != 'self':
extension['issuer'] = issuer_cert
# have to explicitly set 'critical' extension to a bool.
if 'critical' in extension.keys():
extension['critical'] = extension['critical'].lower() in ("yes", "true", "t", "1")
# add the extensions to the request
certificate.add_extensions([crypto.X509Extension(**extension)])
# Handle the subject alternative names (these are just X509 extensions)
if len(subject_alt_names) != 0:
certificate.add_extensions([crypto.X509Extension("subjectAltName", False, ", ".join(subject_alt_names))])
certificate.set_serial_number(serial)
certificate.set_version(version)
certificate.gmtime_adj_notBefore(0)
certificate.gmtime_adj_notAfter(days*86400)
certificate.set_subject(request.get_subject())
certificate.set_issuer(issuer_cert.get_subject())
certificate.set_pubkey(request.get_pubkey())
certificate.sign(issuer_key, digest)
return certificate
def create_certificate(self, request, issuer_cert, issuer_key, serial, days=3650, digest='sha256', extensions=[], subject_alt_names='', version=2):
"""
Generate a certificate given the certificate request.
Arguments: request - Certificate request to sign
issuer_cert - The certificate of the issuer
issuer_key - The private key of the issuer
extensions - x509 extensions provided as a dictionary :name, :critical, :value
subject_alt_names - subject alt names e.g. IP:192.168.7.1 or DNS:my.domain
serial - The serial number to assign to the certificate
days - The number of days of validity (starting from now)
digest - The digest method for signing (by default sha256)
"""
certificate = crypto.X509()
# Handle x509 extensions
for extension in extensions:
# handle issuer and subjects that need to be self-referential (root certificate)
if 'subject' in extension.keys() and extension['subject'] == 'self':
extension['subject'] = certificate
if 'issuer' in extension.keys() and extension['issuer'] == 'self':
extension['issuer'] = certificate
elif 'issuer' in extension.keys() and extension['issuer'] != 'self':
extension['issuer'] = issuer_cert
# have to explicitly set 'critical' extension to a bool.
if 'critical' in extension.keys():
extension['critical'] = extension['critical'].lower() in ("yes", "true", "t", "1")
# add the extensions to the request
certificate.add_extensions([crypto.X509Extension(**extension)])
# Handle the subject alternative names (these are just X509 extensions)
if len(subject_alt_names) != 0:
certificate.add_extensions([crypto.X509Extension("subjectAltName", False, ", ".join(subject_alt_names))])
certificate.set_serial_number(serial)
certificate.set_version(version)
certificate.gmtime_adj_notBefore(0)
certificate.gmtime_adj_notAfter(days*86400)
certificate.set_subject(request.get_subject())
certificate.set_issuer(issuer_cert.get_subject())
certificate.set_pubkey(request.get_pubkey())
certificate.sign(issuer_key, digest)
return certificate
def _create_certificate_chain():
"""
Construct and return a chain of certificates.
1. A new self-signed certificate authority certificate (cacert)
2. A new intermediate certificate signed by cacert (icert)
3. A new server certificate signed by icert (scert)
"""
caext = X509Extension(b'basicConstraints', False, b'CA:true')
# Step 1
cakey = PKey()
cakey.generate_key(TYPE_RSA, 1024)
cacert = X509()
cacert.get_subject().commonName = "Authority Certificate"
cacert.set_issuer(cacert.get_subject())
cacert.set_pubkey(cakey)
cacert.set_notBefore(b"20000101000000Z")
cacert.set_notAfter(b"20200101000000Z")
cacert.add_extensions([caext])
cacert.set_serial_number(0)
cacert.sign(cakey, "sha1")
# Step 2
ikey = PKey()
ikey.generate_key(TYPE_RSA, 1024)
icert = X509()
icert.get_subject().commonName = "Intermediate Certificate"
icert.set_issuer(cacert.get_subject())
icert.set_pubkey(ikey)
icert.set_notBefore(b"20000101000000Z")
icert.set_notAfter(b"20200101000000Z")
icert.add_extensions([caext])
icert.set_serial_number(0)
icert.sign(cakey, "sha1")
# Step 3
skey = PKey()
skey.generate_key(TYPE_RSA, 1024)
scert = X509()
scert.get_subject().commonName = "Server Certificate"
scert.set_issuer(icert.get_subject())
scert.set_pubkey(skey)
scert.set_notBefore(b"20000101000000Z")
scert.set_notAfter(b"20200101000000Z")
scert.add_extensions([
X509Extension(b'basicConstraints', True, b'CA:false')])
scert.set_serial_number(0)
scert.sign(ikey, "sha1")
return [(cakey, cacert), (ikey, icert), (skey, scert)]
def _add_extensions(self, cert):
"""
(internal use only)
adds x509 extensions to ``cert``
"""
ext = []
# prepare extensions for CA
if not hasattr(self, 'ca'):
pathlen = app_settings.CA_BASIC_CONSTRAINTS_PATHLEN
ext_value = 'CA:TRUE'
if pathlen is not None:
ext_value = '{0}, pathlen:{1}'.format(ext_value, pathlen)
ext.append(crypto.X509Extension(b'basicConstraints',
app_settings.CA_BASIC_CONSTRAINTS_CRITICAL,
bytes_compat(ext_value)))
ext.append(crypto.X509Extension(b'keyUsage',
app_settings.CA_KEYUSAGE_CRITICAL,
bytes_compat(app_settings.CA_KEYUSAGE_VALUE)))
issuer_cert = cert
# prepare extensions for end-entity certs
else:
ext.append(crypto.X509Extension(b'basicConstraints',
False,
b'CA:FALSE'))
ext.append(crypto.X509Extension(b'keyUsage',
app_settings.CERT_KEYUSAGE_CRITICAL,
bytes_compat(app_settings.CERT_KEYUSAGE_VALUE)))
issuer_cert = self.ca.x509
ext.append(crypto.X509Extension(b'subjectKeyIdentifier',
False,
b'hash',
subject=cert))
cert.add_extensions(ext)
# authorityKeyIdentifier must be added after
# the other extensions have been already added
cert.add_extensions([
crypto.X509Extension(b'authorityKeyIdentifier',
False,
b'keyid:always,issuer:always',
issuer=issuer_cert)
])
for ext in self.extensions:
cert.add_extensions([
crypto.X509Extension(bytes_compat(ext['name']),
bool(ext['critical']),
bytes_compat(ext['value']))
])
return cert
def createCertificate(req, issuerCertKey, serial, validityPeriod, digest="sha256", isCA=False, extensions=[]):
"""
Generate a certificate given a certificate request.
Arguments: req - Certificate request to use
issuerCert - The certificate of the issuer
issuerKey - The private key of the issuer
serial - Serial number for the certificate
notBefore - Timestamp (relative to now) when the certificate
starts being valid
notAfter - Timestamp (relative to now) when the certificate
stops being valid
digest - Digest method to use for signing, default is sha256
Returns: The signed certificate in an X509 object
"""
issuerCert, issuerKey = issuerCertKey
notBefore, notAfter = validityPeriod
cert = crypto.X509()
cert.set_version(3)
cert.set_serial_number(serial)
cert.gmtime_adj_notBefore(notBefore)
cert.gmtime_adj_notAfter(notAfter)
cert.set_issuer(issuerCert.get_subject())
cert.set_subject(req.get_subject())
cert.set_pubkey(req.get_pubkey())
if isCA:
cert.add_extensions([crypto.X509Extension("basicConstraints", True,
"CA:TRUE, pathlen:0"),
crypto.X509Extension("subjectKeyIdentifier", False, "hash",
subject=cert)])
#TODO: This only is appropriate for root self signed!!!!
cert.add_extensions([crypto.X509Extension("authorityKeyIdentifier", False, "keyid:always", issuer=cert)])
else:
cert.add_extensions([crypto.X509Extension("basicConstraints", True,
"CA:FALSE"),
crypto.X509Extension("subjectKeyIdentifier", False, "hash",
subject=cert)])
cert.add_extensions([crypto.X509Extension("authorityKeyIdentifier", False, "keyid:always", issuer=issuerCert)])
cert.add_extensions(extensions)
cert.sign(issuerKey, digest)
return cert
# SUBJECT_DEFAULT = {countryName : "US", stateOrProvinceName : "NC", localityName : "RTP", organizationName : "IBM", organizationalUnitName : "Blockchain"}
def _create_certificate_chain():
"""
Construct and return a chain of certificates.
1. A new self-signed certificate authority certificate (cacert)
2. A new intermediate certificate signed by cacert (icert)
3. A new server certificate signed by icert (scert)
"""
caext = X509Extension(b('basicConstraints'), False, b('CA:true'))
# Step 1
cakey = PKey()
cakey.generate_key(TYPE_RSA, 512)
cacert = X509()
cacert.get_subject().commonName = "Authority Certificate"
cacert.set_issuer(cacert.get_subject())
cacert.set_pubkey(cakey)
cacert.set_notBefore(b("20000101000000Z"))
cacert.set_notAfter(b("20200101000000Z"))
cacert.add_extensions([caext])
cacert.set_serial_number(0)
cacert.sign(cakey, "sha1")
# Step 2
ikey = PKey()
ikey.generate_key(TYPE_RSA, 512)
icert = X509()
icert.get_subject().commonName = "Intermediate Certificate"
icert.set_issuer(cacert.get_subject())
icert.set_pubkey(ikey)
icert.set_notBefore(b("20000101000000Z"))
icert.set_notAfter(b("20200101000000Z"))
icert.add_extensions([caext])
icert.set_serial_number(0)
icert.sign(cakey, "sha1")
# Step 3
skey = PKey()
skey.generate_key(TYPE_RSA, 512)
scert = X509()
scert.get_subject().commonName = "Server Certificate"
scert.set_issuer(icert.get_subject())
scert.set_pubkey(skey)
scert.set_notBefore(b("20000101000000Z"))
scert.set_notAfter(b("20200101000000Z"))
scert.add_extensions([
X509Extension(b('basicConstraints'), True, b('CA:false'))])
scert.set_serial_number(0)
scert.sign(ikey, "sha1")
return [(cakey, cacert), (ikey, icert), (skey, scert)]