def get_certificate(self, name, opts):
if self.keypairs[name]["certificate"]:
return self.keypairs[name]["certificate"]
else:
if self.path:
with open(os.path.join(self.path, "{}.pem".format(name)), "rb") as fp:
certificate = x509.load_pem_x509_certificate(fp.read(), default_backend())
else:
ca_key = self.get_certificate_authority_key()
ca_certificate = self.get_certificate_authority_certificate()
builder = x509.CertificateBuilder()
builder = builder.serial_number(int(uuid.uuid4()))
builder = builder.not_valid_before(datetime.datetime.today() - datetime.timedelta(1, 0, 0))
builder = builder.not_valid_after(datetime.datetime(2018, 8, 2))
builder = builder.public_key(ca_key.public_key())
builder = builder.subject_name(x509.Name([
x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "CO"),
x509.NameAttribute(x509.NameOID.LOCALITY_NAME, "Denver"),
x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "Eldarion, Inc."),
x509.NameAttribute(x509.NameOID.COMMON_NAME, "kube-{}".format(name)),
]))
builder = builder.issuer_name(ca_certificate.issuer)
if opts.get("sans"):
builder = builder.add_extension(
x509.SubjectAlternativeName(opts["sans"]),
critical=False,
)
builder = builder.add_extension(
x509.BasicConstraints(
ca=False,
path_length=None
),
critical=False,
)
certificate = builder.sign(
private_key=ca_key,
algorithm=hashes.SHA256(),
backend=default_backend(),
)
self.keypairs[name]["certificate"] = certificate
return certificate
python类BasicConstraints()的实例源码
def __check_extensions(self, cert, usages, cur_pathlen):
"""Check whether the critical extensions in this certificate
are supported and allow the provided use(s)."""
try:
exts = cert.extensions
except (ValueError, x509.UnsupportedExtension) as e:
raise api_errors.InvalidCertificateExtensions(
cert, e)
for ext in exts:
etype = type(ext.value)
if etype in SUPPORTED_EXTENSIONS:
keys = EXTENSIONS_VALUES[etype]
if etype == x509.BasicConstraints:
pathlen = ext.value.path_length
if pathlen is not None and \
cur_pathlen > pathlen:
raise api_errors.PathlenTooShort(cert,
cur_pathlen, pathlen)
elif etype == x509.KeyUsage:
keys = list(EXTENSIONS_VALUES[etype])
if not getattr(ext.value,
"key_agreement"):
# Cryptography error:
# encipher_only/decipher_only is
# undefined unless key_agreement
# is true
keys.remove("encipher_only")
keys.remove("decipher_only")
vs = [
key
for key in keys
if getattr(ext.value, key)
]
# For each use, check to see whether it's
# permitted by the certificate's extension
# values.
if etype not in usages:
continue
for u in usages[etype]:
if u not in vs:
raise api_errors.InappropriateCertificateUse(
cert, ext, u, ", ".join(vs))
# If the extension name is unrecognized and critical,
# then the chain cannot be verified.
elif ext.critical:
raise api_errors.UnsupportedCriticalExtension(
cert, ext)