def generateCertificateObjects(organization, organizationalUnit):
pkey = crypto.PKey()
pkey.generate_key(crypto.TYPE_RSA, 512)
req = crypto.X509Req()
subject = req.get_subject()
subject.O = organization
subject.OU = organizationalUnit
req.set_pubkey(pkey)
req.sign(pkey, "md5")
# Here comes the actual certificate
cert = crypto.X509()
cert.set_serial_number(1)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(60) # Testing certificates need not be long lived
cert.set_issuer(req.get_subject())
cert.set_subject(req.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.sign(pkey, "md5")
return pkey, req, cert
python类X509Req()的实例源码
def otherMakeCertificate(**kw):
keypair = PKey()
keypair.generate_key(TYPE_RSA, 1024)
req = X509Req()
subj = req.get_subject()
for (k, v) in kw.items():
setattr(subj, k, v)
req.set_pubkey(keypair)
req.sign(keypair, "md5")
cert = X509()
cert.set_serial_number(counter())
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year
cert.set_issuer(req.get_subject())
cert.set_subject(req.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.sign(keypair, "md5")
return keypair, cert
def otherMakeCertificate(**kw):
keypair = PKey()
keypair.generate_key(TYPE_RSA, 1024)
req = X509Req()
subj = req.get_subject()
for (k, v) in kw.items():
setattr(subj, k, v)
req.set_pubkey(keypair)
req.sign(keypair, "md5")
cert = X509()
cert.set_serial_number(counter())
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year
cert.set_issuer(req.get_subject())
cert.set_subject(req.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.sign(keypair, "md5")
return keypair, cert
def __init__(self, osslpkey):
self.original = osslpkey
req1 = crypto.X509Req()
req1.set_pubkey(osslpkey)
self._emptyReq = crypto.dump_certificate_request(crypto.FILETYPE_ASN1, req1)
def requestObject(self, distinguishedName, digestAlgorithm='md5'):
req = crypto.X509Req()
req.set_pubkey(self.original)
distinguishedName._copyInto(req.get_subject())
req.sign(self.original, digestAlgorithm)
return CertificateRequest(req)
def generateCertificateObjects(organization, organizationalUnit):
pkey = crypto.PKey()
pkey.generate_key(crypto.TYPE_RSA, 512)
req = crypto.X509Req()
subject = req.get_subject()
subject.O = organization
subject.OU = organizationalUnit
req.set_pubkey(pkey)
req.sign(pkey, "md5")
# Here comes the actual certificate
cert = crypto.X509()
cert.set_serial_number(1)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(60) # Testing certificates need not be long lived
cert.set_issuer(req.get_subject())
cert.set_subject(req.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.sign(pkey, "md5")
return pkey, req, cert
def create_certificate_request(self, pkey, digest='sha256', **name):
"""
Create a certificate request.
Arguments: key - The key to associate with the request
digest - Digestion method to use for signing, default is sha256
**name - The name of the subject of the request, possible
arguments are:
C - Country name
ST - State or province name
L - Locality name
O - Organization name
OU - Organizational unit name
CN - Common name
emailAddress - E-mail address
Returns: The certificate request in an X509Req object
"""
request = crypto.X509Req()
subject = request.get_subject()
# Handle creating the subject of the request
for(key,value) in name.items():
setattr(subject, key, value)
request.set_pubkey(pkey)
# Sign the request
request.sign(pkey, digest)
return request
def create_certificate_request(self, pkey, digest='sha256', **name):
"""
Create a certificate request.
Arguments: key - The key to associate with the request
digest - Digestion method to use for signing, default is sha256
**name - The name of the subject of the request, possible
arguments are:
C - Country name
ST - State or province name
L - Locality name
O - Organization name
OU - Organizational unit name
CN - Common name
emailAddress - E-mail address
Returns: The certificate request in an X509Req object
"""
request = crypto.X509Req()
subject = request.get_subject()
# Handle creating the subject of the request
for(key,value) in name.items():
setattr(subject, key, value)
request.set_pubkey(pkey)
# Sign the request
request.sign(pkey, digest)
return request
def generate_csr_and_key(user='TestUser'):
"""
TestUser is the user proposed by the documentation,
which will be ignored
"""
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 1024)
req = crypto.X509Req()
req.get_subject().CN = user
req.set_pubkey(key)
req.sign(key, "sha1")
# print("CSR", key, req)
return key, req
def generate(self, module):
'''Generate the certificate signing request.'''
if not self.check(module, perms_required=False) or self.force:
req = crypto.X509Req()
req.set_version(self.version)
subject = req.get_subject()
for (key, value) in self.subject.items():
if value is not None:
setattr(subject, key, value)
altnames = ', '.join(self.subjectAltName)
extensions = [crypto.X509Extension(b"subjectAltName", False, altnames.encode('ascii'))]
if self.keyUsage:
usages = ', '.join(self.keyUsage)
extensions.append(crypto.X509Extension(b"keyUsage", False, usages.encode('ascii')))
if self.extendedKeyUsage:
usages = ', '.join(self.extendedKeyUsage)
extensions.append(crypto.X509Extension(b"extendedKeyUsage", False, usages.encode('ascii')))
req.add_extensions(extensions)
req.set_pubkey(self.privatekey)
req.sign(self.privatekey, self.digest)
self.request = req
try:
csr_file = open(self.path, 'wb')
csr_file.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request))
csr_file.close()
except (IOError, OSError) as exc:
raise CertificateSigningRequestError(exc)
self.changed = True
file_args = module.load_file_common_arguments(module.params)
if module.set_fs_attributes_if_different(file_args, False):
self.changed = True
def createCertRequest(pkey, digest="sha256", **name):
"""
Create a certificate request.
Arguments: pkey - The key to associate with the request
digest - Digestion method to use for signing, default is sha256
**name - The name of the subject of the request, possible
arguments are:
C - Country name
ST - State or province name
L - Locality name
O - Organization name
OU - Organizational unit name
CN - Common name
emailAddress - E-mail address
Returns: The certificate request in an X509Req object
"""
req = crypto.X509Req()
subj = req.get_subject()
for key, value in name.items():
setattr(subj, key, value)
req.set_pubkey(pkey)
req.sign(pkey, digest)
return req
def __init__(self, osslpkey):
self.original = osslpkey
req1 = crypto.X509Req()
req1.set_pubkey(osslpkey)
self._emptyReq = crypto.dump_certificate_request(crypto.FILETYPE_ASN1, req1)
def requestObject(self, distinguishedName, digestAlgorithm='md5'):
req = crypto.X509Req()
req.set_pubkey(self.original)
distinguishedName._copyInto(req.get_subject())
req.sign(self.original, digestAlgorithm)
return CertificateRequest(req)
def createCertRequest(pkey, extensions=[], digest="sha256", **name):
"""
Create a certificate request.
Arguments: pkey - The key to associate with the request
digest - Digestion method to use for signing, default is sha256
**name - The name of the subject of the request, possible
arguments are:
C - Country name
ST - State or province name
L - Locality name
O - Organization name
OU - Organizational unit name
CN - Common name
emailAddress - E-mail address
Returns: The certificate request in an X509Req object
"""
req = crypto.X509Req()
subj = req.get_subject()
for key, value in name.items():
setattr(subj, key, value)
req.add_extensions(extensions)
req.set_pubkey(pkey)
req.sign(pkey, digest)
return req
def generate(self, module):
'''Generate the certificate signing request.'''
if not os.path.exists(self.path) or self.force:
req = crypto.X509Req()
req.set_version(self.version)
subject = req.get_subject()
for (key, value) in self.subject.items():
if value is not None:
setattr(subject, key, value)
if self.subjectAltName is not None:
req.add_extensions([crypto.X509Extension(b"subjectAltName", False, self.subjectAltName.encode('ascii'))])
privatekey_content = open(self.privatekey_path).read()
self.privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, privatekey_content)
req.set_pubkey(self.privatekey)
req.sign(self.privatekey, self.digest)
self.request = req
try:
csr_file = open(self.path, 'wb')
csr_file.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request))
csr_file.close()
except (IOError, OSError) as exc:
raise CertificateSigningRequestError(exc)
else:
self.changed = False
file_args = module.load_file_common_arguments(module.params)
if module.set_fs_attributes_if_different(file_args, False):
self.changed = True
def _generate_csr(key, key_digest, domains):
csr = crypto.X509Req()
csr.set_version(2)
csr.set_pubkey(key)
sans = ', '.join('DNS:{}'.format(d) for d in domains)
exts = [crypto.X509Extension(b'subjectAltName', False, b(sans))]
csr.add_extensions(exts)
csr.sign(key, str(key_digest))
return ComparableX509(csr)
def requestObject(self, distinguishedName, digestAlgorithm='sha256'):
req = crypto.X509Req()
req.set_pubkey(self.original)
distinguishedName._copyInto(req.get_subject())
req.sign(self.original, digestAlgorithm)
return CertificateRequest(req)
def generateCertificateObjects(organization, organizationalUnit):
"""
Create a certificate for given C{organization} and C{organizationalUnit}.
@return: a tuple of (key, request, certificate) objects.
"""
pkey = crypto.PKey()
pkey.generate_key(crypto.TYPE_RSA, 512)
req = crypto.X509Req()
subject = req.get_subject()
subject.O = organization
subject.OU = organizationalUnit
req.set_pubkey(pkey)
req.sign(pkey, "md5")
# Here comes the actual certificate
cert = crypto.X509()
cert.set_serial_number(1)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(60) # Testing certificates need not be long lived
cert.set_issuer(req.get_subject())
cert.set_subject(req.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.sign(pkey, "md5")
return pkey, req, cert
def generate_certificate(config_cert, ca, cakey):
# Generate the private key
key = openssl_generate_privatekey(config_cert['keyfilesize'])
# Generate the certificate request
req = crypto.X509Req()
req_subj = req.get_subject()
if 'commonName' in config_cert:
req_subj.commonName = config_cert['commonName']
if 'stateOrProvinceName' in config_cert:
req_subj.stateOrProvinceName = config_cert['stateOrProvinceName']
if 'localityName' in config_cert:
req_subj.localityName = config_cert['localityName']
if 'organizationName' in config_cert:
req_subj.organizationName = config_cert['organizationName']
if 'organizationalUnitName' in config_cert:
req_subj.organizationalUnitName = config_cert['organizationalUnitName']
if 'emailAddress' in config_cert:
req_subj.emailAddress = config_cert['emailAddress']
if 'countryName' in config_cert:
req_subj.countryName = config_cert['countryName']
req.set_pubkey(key)
req.sign(key, config_cert['hashalgorithm'])
# Now generate the certificate itself
cert = crypto.X509()
cert.set_version(2)
cert.set_serial_number(config_cert['serial'])
cert.set_subject(req.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.set_issuer(ca.get_subject())
if 'validfrom' in config_cert:
cert.set_notBefore(config_cert['validfrom'])
if 'validto' in config_cert:
cert.set_notAfter(config_cert['validto'])
cert.add_extensions([
crypto.X509Extension("basicConstraints", True, "CA:FALSE"),
crypto.X509Extension("keyUsage", False, "digitalSignature"),
crypto.X509Extension("extendedKeyUsage", False, "codeSigning,msCTLSign,timeStamping,msCodeInd,msCodeCom"),
crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=cert),
crypto.X509Extension("authorityKeyIdentifier", False, "keyid:always", issuer=ca)
])
cert.sign(cakey, config_cert['hashalgorithm'])
return req, cert, key
def generate_certificate_signing_request(conf, domain):
key = load_private_key(conf, domain)
LOG.info("Creating Certificate Signing Request.")
csr = crypto.X509Req()
csr.get_subject().countryName = domain['countryName']
csr.get_subject().CN = domain['name']
csr.set_pubkey(key)
csr.sign(key, "sha1")
return (csr, key)
def _create_csr(self):
LOG.info('[%s] Generating CSR' % self.name)
req = crypto.X509Req()
LOG.debug('[%s] Attaching Certificate Version to CSR: %s' %
(self.name, self.version))
req.set_version(self.version)
subject = req.get_subject()
for (key, value) in self.subject.items():
if value is not None:
LOG.debug('[%s] Attaching %s to CSR: %s' %
(self.name, key, value))
setattr(subject, key, value)
LOG.info('[%s] Attaching SAN extention: %s' %
(self.name, self.subjectAltName))
try:
req.add_extensions([crypto.X509Extension(
bytes('subjectAltName', 'utf-8'), False,
bytes(self.subjectAltName, 'utf-8')
)])
except TypeError:
req.add_extensions([crypto.X509Extension('subjectAltName', False,
self.subjectAltName)])
LOG.debug('[%s] Loading private key: %s/private/%s.key' %
(self.name, self.path, self.name))
privatekey_content = open('%s/private/%s.key' %
(self.path, self.name)).read()
privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM,
privatekey_content)
LOG.info('[%s] Signing CSR' % self.name)
req.set_pubkey(privatekey)
req.sign(privatekey, self.digest)
LOG.debug('[%s] Writting CSR: %s/csr/%s.csr' %
(self.name, self.path, self.name))
csr_file = open('%s/csr/%s.csr' % (self.path, self.name), 'w')
csr_file.write((crypto.dump_certificate_request(crypto.FILETYPE_PEM,
req)).decode('utf-8'))
csr_file.close()
def _request_new_cert(self, domain, keyfile, altname=None):
""" Generate a new certificate for a domain using an ACME authority """
self._init_client()
# Specific logger
logger = logging.getLogger('certproxy.acmeproxy.acme')
# Load the private key
key = crypto.load_privatekey(crypto.FILETYPE_PEM, readfile(keyfile, binary=True))
# Create a CSR
req = crypto.X509Req()
req.get_subject().CN = domain
req.add_extensions([
crypto.X509Extension("keyUsage".encode(), False, "Digital Signature, Non Repudiation, Key Encipherment".encode()),
crypto.X509Extension("basicConstraints".encode(), False, "CA:FALSE".encode()),
])
if altname:
req.add_extensions([
crypto.X509Extension("subjectAltName".encode(), False, ', '.join(["DNS:{}".format(domain) for domain in altname]).encode())
])
req.set_pubkey(key)
req.sign(key, "sha256")
# Validate every domain
auths = []
for dom in set(altname + [domain]):
logger.debug("Requesting challenges for domain %s.", dom)
auth = self.client.request_domain_challenges(dom)
if auth.body.status != acme.messages.STATUS_VALID:
logger.debug("Domain %s not yet authorized. Processing authorization.", dom)
self._process_auth(auth)
else:
logger.debug("Domain %s already authorized, valid till %s.", dom, auth.body.expires)
auths.append(auth)
# Request certificate and chain
logger.debug("Requesting certificate issuance for domain %s (altname: %s).", domain, ','.join(altname))
(crt, auths) = self.client.poll_and_request_issuance(acme.jose.util.ComparableX509(req), auths)
logger.debug("Requesting certificate chain for domain %s.", domain)
chain = self.client.fetch_chain(crt)
# Dump to PEM
crt_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, crt.body)
chain_pem = '\n'.join([crypto.dump_certificate(crypto.FILETYPE_PEM, link).decode() for link in chain]).encode()
return (crt_pem, chain_pem)