def create_ca():
pkey = crypto.PKey()
pkey.generate_key(crypto.TYPE_RSA, 2048)
ca = crypto.X509()
ca.set_version(2)
ca.set_serial_number(0)
subject = ca.get_subject()
subject.countryName = 'CN'
subject.stateOrProvinceName = 'Internet'
subject.localityName = 'Cernet'
subject.organizationName = ca_vendor
subject.organizationalUnitName = '%s Root' % ca_vendor
subject.commonName = '%s CA' % ca_vendor
#????????????????????
ca.gmtime_adj_notBefore(ca_time_b)
ca.gmtime_adj_notAfter(ca_time_a)
ca.set_issuer(subject)
ca.set_pubkey(pkey)
ca.add_extensions([
crypto.X509Extension(b'basicConstraints', True, b'CA:TRUE, pathlen:0'),
crypto.X509Extension(b'extendedKeyUsage', True, b'serverAuth,emailProtection,timeStamping'),
crypto.X509Extension(b'keyUsage', False, b'keyCertSign, cRLSign'),
crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', subject=ca), ])
ca.sign(pkey, ca_digest)
return pkey, ca
python类X509Extension()的实例源码
def test_construction(self):
"""
L{X509Extension} accepts an extension type name, a critical flag,
and an extension value and returns an L{X509ExtensionType} instance.
"""
basic = X509Extension('basicConstraints', True, 'CA:true')
self.assertTrue(
isinstance(basic, X509ExtensionType),
"%r is of type %r, should be %r" % (
basic, type(basic), X509ExtensionType))
comment = X509Extension('nsComment', False, 'pyOpenSSL unit test')
self.assertTrue(
isinstance(comment, X509ExtensionType),
"%r is of type %r, should be %r" % (
comment, type(comment), X509ExtensionType))
def test_invalid_extension(self):
"""
L{X509Extension} raises something if it is passed a bad extension
name or value.
"""
self.assertRaises(
Error, X509Extension, 'thisIsMadeUp', False, 'hi')
self.assertRaises(
Error, X509Extension, 'basicConstraints', False, 'blah blah')
# Exercise a weird one (an extension which uses the r2i method). This
# exercises the codepath that requires a non-NULL ctx to be passed to
# X509V3_EXT_nconf. It can't work now because we provide no
# configuration database. It might be made to work in the future.
self.assertRaises(
Error, X509Extension, 'proxyCertInfo', True,
'language:id-ppl-anyLanguage,pathlen:1,policy:text:AB')
def test_construction(self):
"""
L{X509Extension} accepts an extension type name, a critical flag,
and an extension value and returns an L{X509ExtensionType} instance.
"""
basic = X509Extension('basicConstraints', True, 'CA:true')
self.assertTrue(
isinstance(basic, X509ExtensionType),
"%r is of type %r, should be %r" % (
basic, type(basic), X509ExtensionType))
comment = X509Extension('nsComment', False, 'pyOpenSSL unit test')
self.assertTrue(
isinstance(comment, X509ExtensionType),
"%r is of type %r, should be %r" % (
comment, type(comment), X509ExtensionType))
def test_invalid_extension(self):
"""
L{X509Extension} raises something if it is passed a bad extension
name or value.
"""
self.assertRaises(
Error, X509Extension, 'thisIsMadeUp', False, 'hi')
self.assertRaises(
Error, X509Extension, 'basicConstraints', False, 'blah blah')
# Exercise a weird one (an extension which uses the r2i method). This
# exercises the codepath that requires a non-NULL ctx to be passed to
# X509V3_EXT_nconf. It can't work now because we provide no
# configuration database. It might be made to work in the future.
self.assertRaises(
Error, X509Extension, 'proxyCertInfo', True,
'language:id-ppl-anyLanguage,pathlen:1,policy:text:AB')
man_cert_setup.py 文件源码
项目:aws-greengrass-mini-fulfillment
作者: awslabs
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def create_group_cert(cli):
k = crypto.PKey()
k.generate_key(crypto.TYPE_RSA, 2048) # generate RSA key-pair
cert = crypto.X509()
cert.get_subject().countryName = "US"
cert.get_subject().stateOrProvinceName = "CA"
cert.get_subject().organizationName = "mini-fulfillment"
cert.get_subject().organizationalUnitName = "demo"
cert.get_subject().commonName = "mini-fulfillment"
cert.set_serial_number(1000)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(5 * 365 * 24 * 60 * 60) # 5 year expiry date
cert.set_issuer(cert.get_subject()) # self-sign this certificate
cert.set_pubkey(k)
san_list = ["IP:{0}".format(cli.ip_address)]
extension_list = [
crypto.X509Extension(type_name=b"basicConstraints",
critical=False, value=b"CA:false"),
crypto.X509Extension(type_name=b"subjectAltName",
critical=True, value=", ".join(san_list)),
# crypto.X509Extension(type_name=b"subjectKeyIdentifier",
# critical=True, value=b"hash")
]
cert.add_extensions(extension_list)
cert.sign(k, 'sha256')
prefix = str(cli.out_dir) + '/' + cli.group_name
open("{0}-server.crt".format(prefix), 'wt').write(
crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
open("{0}-server-private.key".format(prefix), 'wt').write(
crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey=k))
open("{0}-server-public.key".format(prefix), 'wt').write(
crypto.dump_publickey(crypto.FILETYPE_PEM, pkey=k))
def test_type(self):
"""
L{X509Extension} and L{X509ExtensionType} refer to the same type object
and can be used to create instances of that type.
"""
self.assertIdentical(X509Extension, X509ExtensionType)
self.assertConsistentType(
X509Extension, 'X509Extension', 'basicConstraints', True, 'CA:true')
def test_get_critical(self):
"""
L{X509ExtensionType.get_critical} returns the value of the
extension's critical flag.
"""
ext = X509Extension('basicConstraints', True, 'CA:true')
self.assertTrue(ext.get_critical())
ext = X509Extension('basicConstraints', False, 'CA:true')
self.assertFalse(ext.get_critical())
def test_get_short_name(self):
"""
L{X509ExtensionType.get_short_name} returns a string giving the short
type name of the extension.
"""
ext = X509Extension('basicConstraints', True, 'CA:true')
self.assertEqual(ext.get_short_name(), 'basicConstraints')
ext = X509Extension('nsComment', True, 'foo bar')
self.assertEqual(ext.get_short_name(), 'nsComment')
def test_subject(self):
"""
If an extension requires a subject, the C{subject} parameter to
L{X509Extension} provides its value.
"""
ext3 = X509Extension('subjectKeyIdentifier', False, 'hash', subject=self.x509)
self.x509.add_extensions([ext3])
self.x509.sign(self.pkey, 'sha1')
text = dump_certificate(FILETYPE_TEXT, self.x509)
self.assertTrue('X509v3 Subject Key Identifier:' in text)
def test_missing_subject(self):
"""
If an extension requires a subject and the C{subject} parameter is
given no value, something happens.
"""
self.assertRaises(
Error, X509Extension, 'subjectKeyIdentifier', False, 'hash')
def test_invalid_subject(self):
"""
If the C{subject} parameter is given a value which is not an L{X509}
instance, L{TypeError} is raised.
"""
for badObj in [True, object(), "hello", [], self]:
self.assertRaises(
TypeError,
X509Extension,
'basicConstraints', False, 'CA:TRUE', subject=badObj)
def test_unused_issuer(self):
"""
The C{issuer} parameter to L{X509Extension} may be provided for an
extension which does not use it and is ignored in this case.
"""
ext1 = X509Extension('basicConstraints', False, 'CA:TRUE', issuer=self.x509)
self.x509.add_extensions([ext1])
self.x509.sign(self.pkey, 'sha1')
text = dump_certificate(FILETYPE_TEXT, self.x509)
self.assertTrue('X509v3 Basic Constraints:' in text)
self.assertTrue('CA:TRUE' in text)
def test_issuer(self):
"""
If an extension requires a issuer, the C{issuer} parameter to
L{X509Extension} provides its value.
"""
ext2 = X509Extension(
'authorityKeyIdentifier', False, 'issuer:always',
issuer=self.x509)
self.x509.add_extensions([ext2])
self.x509.sign(self.pkey, 'sha1')
text = dump_certificate(FILETYPE_TEXT, self.x509)
self.assertTrue('X509v3 Authority Key Identifier:' in text)
self.assertTrue('DirName:/CN=Yoda root CA' in text)
def test_invalid_issuer(self):
"""
If the C{issuer} parameter is given a value which is not an L{X509}
instance, L{TypeError} is raised.
"""
for badObj in [True, object(), "hello", [], self]:
self.assertRaises(
TypeError,
X509Extension,
'authorityKeyIdentifier', False, 'keyid:always,issuer:always',
issuer=badObj)
def generate(self, module):
'''Generate the certificate signing request.'''
if not self.check(module, perms_required=False) or self.force:
req = crypto.X509Req()
req.set_version(self.version)
subject = req.get_subject()
for (key, value) in self.subject.items():
if value is not None:
setattr(subject, key, value)
altnames = ', '.join(self.subjectAltName)
extensions = [crypto.X509Extension(b"subjectAltName", False, altnames.encode('ascii'))]
if self.keyUsage:
usages = ', '.join(self.keyUsage)
extensions.append(crypto.X509Extension(b"keyUsage", False, usages.encode('ascii')))
if self.extendedKeyUsage:
usages = ', '.join(self.extendedKeyUsage)
extensions.append(crypto.X509Extension(b"extendedKeyUsage", False, usages.encode('ascii')))
req.add_extensions(extensions)
req.set_pubkey(self.privatekey)
req.sign(self.privatekey, self.digest)
self.request = req
try:
csr_file = open(self.path, 'wb')
csr_file.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request))
csr_file.close()
except (IOError, OSError) as exc:
raise CertificateSigningRequestError(exc)
self.changed = True
file_args = module.load_file_common_arguments(module.params)
if module.set_fs_attributes_if_different(file_args, False):
self.changed = True
def test_subject_key_identifier(self):
ca = self._create_ca()
e = ca.x509.get_extension(2)
self.assertEqual(e.get_short_name().decode(), 'subjectKeyIdentifier')
self.assertEqual(e.get_critical(), False)
e2 = crypto.X509Extension(b'subjectKeyIdentifier',
False,
b'hash',
subject=ca.x509)
self.assertEqual(e.get_data(), e2.get_data())
def test_authority_key_identifier(self):
ca = self._create_ca()
e = ca.x509.get_extension(3)
self.assertEqual(e.get_short_name().decode(), 'authorityKeyIdentifier')
self.assertEqual(e.get_critical(), False)
e2 = crypto.X509Extension(b'authorityKeyIdentifier',
False,
b'keyid:always,issuer:always',
issuer=ca.x509)
self.assertEqual(e.get_data(), e2.get_data())
def test_subject_key_identifier(self):
cert = self._create_cert()
e = cert.x509.get_extension(2)
self.assertEqual(e.get_short_name().decode(), 'subjectKeyIdentifier')
self.assertEqual(e.get_critical(), False)
e2 = crypto.X509Extension(b'subjectKeyIdentifier',
False,
b'hash',
subject=cert.x509)
self.assertEqual(e.get_data(), e2.get_data())
def test_authority_key_identifier(self):
cert = self._create_cert()
e = cert.x509.get_extension(3)
self.assertEqual(e.get_short_name().decode(), 'authorityKeyIdentifier')
self.assertEqual(e.get_critical(), False)
e2 = crypto.X509Extension(b'authorityKeyIdentifier',
False,
b'keyid:always,issuer:always',
issuer=cert.ca.x509)
self.assertEqual(e.get_data(), e2.get_data())
def _add_extensions(self, cert, is_server=False):
"""
(internal use only)
adds x509 extensions to ``cert``
"""
ext = list()
ext.append(crypto.X509Extension(b'basicConstraints',
True,
b'CA:FALSE'))
ext.append(crypto.X509Extension(b'keyUsage',
CERT_KEYUSAGE_CRITICAL,
bytes_compat(CERT_KEYUSAGE_VALUE)))
if is_server:
ext.append(crypto.X509Extension(b'extendedKeyUsage', False, b'serverAuth'))
issuer_cert = self.ca_cert
ext.append(crypto.X509Extension(b'subjectKeyIdentifier',
False,
b'hash',
subject=cert))
cert.add_extensions(ext)
cert.add_extensions([
crypto.X509Extension(b'authorityKeyIdentifier',
False,
b'keyid:always,issuer:always',
issuer=issuer_cert)
])
return cert
def self_signed_cert_gen(
key_type=crypto.TYPE_RSA,
key_bits=4096,
country="US",
state_province="California",
locality="San Francisco",
org="Your Company",
org_unit="Team",
common_name="www.domain.com",
subject_alt_names=[], # alternative dns names as list
# ^ must look like: ["DNS:*.domain.com", "DNS:domain.ym"]
validity_days=10 * 365):
# Create a key pair
k = crypto.PKey()
k.generate_key(key_type, key_bits)
# Create a self-signed cert
cert = crypto.X509()
cert.get_subject().C = country
cert.get_subject().ST = state_province
cert.get_subject().L = locality
cert.get_subject().O = org
cert.get_subject().OU = org_unit
cert.get_subject().CN = common_name
if subject_alt_names:
subject_alt_names = ", ".join(subject_alt_names).encode("utf-8")
cert.add_extensions([
crypto.X509Extension("subjectAltName".encode("utf-8"), False,
subject_alt_names)
])
cert.set_serial_number(random.getrandbits(64))
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(validity_days * 24 * 60 * 60)
cert.set_issuer(cert.get_subject()) # self-signer
cert.set_pubkey(k)
cert.sign(k, 'sha1')
# return a tuple of the private key and the self-signed cert
return (crypto.dump_privatekey(crypto.FILETYPE_PEM, k),
crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
def test_type(self):
"""
L{X509Extension} and L{X509ExtensionType} refer to the same type object
and can be used to create instances of that type.
"""
self.assertIdentical(X509Extension, X509ExtensionType)
self.assertConsistentType(
X509Extension, 'X509Extension', 'basicConstraints', True, 'CA:true')
def test_get_critical(self):
"""
L{X509ExtensionType.get_critical} returns the value of the
extension's critical flag.
"""
ext = X509Extension('basicConstraints', True, 'CA:true')
self.assertTrue(ext.get_critical())
ext = X509Extension('basicConstraints', False, 'CA:true')
self.assertFalse(ext.get_critical())
def test_get_short_name(self):
"""
L{X509ExtensionType.get_short_name} returns a string giving the short
type name of the extension.
"""
ext = X509Extension('basicConstraints', True, 'CA:true')
self.assertEqual(ext.get_short_name(), 'basicConstraints')
ext = X509Extension('nsComment', True, 'foo bar')
self.assertEqual(ext.get_short_name(), 'nsComment')
def test_subject(self):
"""
If an extension requires a subject, the C{subject} parameter to
L{X509Extension} provides its value.
"""
ext3 = X509Extension('subjectKeyIdentifier', False, 'hash', subject=self.x509)
self.x509.add_extensions([ext3])
self.x509.sign(self.pkey, 'sha1')
text = dump_certificate(FILETYPE_TEXT, self.x509)
self.assertTrue('X509v3 Subject Key Identifier:' in text)
def test_missing_subject(self):
"""
If an extension requires a subject and the C{subject} parameter is
given no value, something happens.
"""
self.assertRaises(
Error, X509Extension, 'subjectKeyIdentifier', False, 'hash')
def test_invalid_subject(self):
"""
If the C{subject} parameter is given a value which is not an L{X509}
instance, L{TypeError} is raised.
"""
for badObj in [True, object(), "hello", [], self]:
self.assertRaises(
TypeError,
X509Extension,
'basicConstraints', False, 'CA:TRUE', subject=badObj)
def test_unused_issuer(self):
"""
The C{issuer} parameter to L{X509Extension} may be provided for an
extension which does not use it and is ignored in this case.
"""
ext1 = X509Extension('basicConstraints', False, 'CA:TRUE', issuer=self.x509)
self.x509.add_extensions([ext1])
self.x509.sign(self.pkey, 'sha1')
text = dump_certificate(FILETYPE_TEXT, self.x509)
self.assertTrue('X509v3 Basic Constraints:' in text)
self.assertTrue('CA:TRUE' in text)
def test_issuer(self):
"""
If an extension requires a issuer, the C{issuer} parameter to
L{X509Extension} provides its value.
"""
ext2 = X509Extension(
'authorityKeyIdentifier', False, 'issuer:always',
issuer=self.x509)
self.x509.add_extensions([ext2])
self.x509.sign(self.pkey, 'sha1')
text = dump_certificate(FILETYPE_TEXT, self.x509)
self.assertTrue('X509v3 Authority Key Identifier:' in text)
self.assertTrue('DirName:/CN=Yoda root CA' in text)