def _request_new_cert(self, domain, keyfile, altname=None):
""" Generate a new certificate for a domain using an ACME authority """
self._init_client()
# Specific logger
logger = logging.getLogger('certproxy.acmeproxy.acme')
# Load the private key
key = crypto.load_privatekey(crypto.FILETYPE_PEM, readfile(keyfile, binary=True))
# Create a CSR
req = crypto.X509Req()
req.get_subject().CN = domain
req.add_extensions([
crypto.X509Extension("keyUsage".encode(), False, "Digital Signature, Non Repudiation, Key Encipherment".encode()),
crypto.X509Extension("basicConstraints".encode(), False, "CA:FALSE".encode()),
])
if altname:
req.add_extensions([
crypto.X509Extension("subjectAltName".encode(), False, ', '.join(["DNS:{}".format(domain) for domain in altname]).encode())
])
req.set_pubkey(key)
req.sign(key, "sha256")
# Validate every domain
auths = []
for dom in set(altname + [domain]):
logger.debug("Requesting challenges for domain %s.", dom)
auth = self.client.request_domain_challenges(dom)
if auth.body.status != acme.messages.STATUS_VALID:
logger.debug("Domain %s not yet authorized. Processing authorization.", dom)
self._process_auth(auth)
else:
logger.debug("Domain %s already authorized, valid till %s.", dom, auth.body.expires)
auths.append(auth)
# Request certificate and chain
logger.debug("Requesting certificate issuance for domain %s (altname: %s).", domain, ','.join(altname))
(crt, auths) = self.client.poll_and_request_issuance(acme.jose.util.ComparableX509(req), auths)
logger.debug("Requesting certificate chain for domain %s.", domain)
chain = self.client.fetch_chain(crt)
# Dump to PEM
crt_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, crt.body)
chain_pem = '\n'.join([crypto.dump_certificate(crypto.FILETYPE_PEM, link).decode() for link in chain]).encode()
return (crt_pem, chain_pem)
评论列表
文章目录