def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
get_bit = backend._lib.ASN1_BIT_STRING_get_bit
digital_signature = get_bit(bit_string, 0) == 1
content_commitment = get_bit(bit_string, 1) == 1
key_encipherment = get_bit(bit_string, 2) == 1
data_encipherment = get_bit(bit_string, 3) == 1
key_agreement = get_bit(bit_string, 4) == 1
key_cert_sign = get_bit(bit_string, 5) == 1
crl_sign = get_bit(bit_string, 6) == 1
encipher_only = get_bit(bit_string, 7) == 1
decipher_only = get_bit(bit_string, 8) == 1
return x509.KeyUsage(
digital_signature,
content_commitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only
)
python类KeyUsage()的实例源码
def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
get_bit = backend._lib.ASN1_BIT_STRING_get_bit
digital_signature = get_bit(bit_string, 0) == 1
content_commitment = get_bit(bit_string, 1) == 1
key_encipherment = get_bit(bit_string, 2) == 1
data_encipherment = get_bit(bit_string, 3) == 1
key_agreement = get_bit(bit_string, 4) == 1
key_cert_sign = get_bit(bit_string, 5) == 1
crl_sign = get_bit(bit_string, 6) == 1
encipher_only = get_bit(bit_string, 7) == 1
decipher_only = get_bit(bit_string, 8) == 1
return x509.KeyUsage(
digital_signature,
content_commitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only
)
def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
get_bit = backend._lib.ASN1_BIT_STRING_get_bit
digital_signature = get_bit(bit_string, 0) == 1
content_commitment = get_bit(bit_string, 1) == 1
key_encipherment = get_bit(bit_string, 2) == 1
data_encipherment = get_bit(bit_string, 3) == 1
key_agreement = get_bit(bit_string, 4) == 1
key_cert_sign = get_bit(bit_string, 5) == 1
crl_sign = get_bit(bit_string, 6) == 1
encipher_only = get_bit(bit_string, 7) == 1
decipher_only = get_bit(bit_string, 8) == 1
return x509.KeyUsage(
digital_signature,
content_commitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only
)
def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
get_bit = backend._lib.ASN1_BIT_STRING_get_bit
digital_signature = get_bit(bit_string, 0) == 1
content_commitment = get_bit(bit_string, 1) == 1
key_encipherment = get_bit(bit_string, 2) == 1
data_encipherment = get_bit(bit_string, 3) == 1
key_agreement = get_bit(bit_string, 4) == 1
key_cert_sign = get_bit(bit_string, 5) == 1
crl_sign = get_bit(bit_string, 6) == 1
encipher_only = get_bit(bit_string, 7) == 1
decipher_only = get_bit(bit_string, 8) == 1
return x509.KeyUsage(
digital_signature,
content_commitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only
)
def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
get_bit = backend._lib.ASN1_BIT_STRING_get_bit
digital_signature = get_bit(bit_string, 0) == 1
content_commitment = get_bit(bit_string, 1) == 1
key_encipherment = get_bit(bit_string, 2) == 1
data_encipherment = get_bit(bit_string, 3) == 1
key_agreement = get_bit(bit_string, 4) == 1
key_cert_sign = get_bit(bit_string, 5) == 1
crl_sign = get_bit(bit_string, 6) == 1
encipher_only = get_bit(bit_string, 7) == 1
decipher_only = get_bit(bit_string, 8) == 1
return x509.KeyUsage(
digital_signature,
content_commitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only
)
def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
get_bit = backend._lib.ASN1_BIT_STRING_get_bit
digital_signature = get_bit(bit_string, 0) == 1
content_commitment = get_bit(bit_string, 1) == 1
key_encipherment = get_bit(bit_string, 2) == 1
data_encipherment = get_bit(bit_string, 3) == 1
key_agreement = get_bit(bit_string, 4) == 1
key_cert_sign = get_bit(bit_string, 5) == 1
crl_sign = get_bit(bit_string, 6) == 1
encipher_only = get_bit(bit_string, 7) == 1
decipher_only = get_bit(bit_string, 8) == 1
return x509.KeyUsage(
digital_signature,
content_commitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only
)
def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
get_bit = backend._lib.ASN1_BIT_STRING_get_bit
digital_signature = get_bit(bit_string, 0) == 1
content_commitment = get_bit(bit_string, 1) == 1
key_encipherment = get_bit(bit_string, 2) == 1
data_encipherment = get_bit(bit_string, 3) == 1
key_agreement = get_bit(bit_string, 4) == 1
key_cert_sign = get_bit(bit_string, 5) == 1
crl_sign = get_bit(bit_string, 6) == 1
encipher_only = get_bit(bit_string, 7) == 1
decipher_only = get_bit(bit_string, 8) == 1
return x509.KeyUsage(
digital_signature,
content_commitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only
)
def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
get_bit = backend._lib.ASN1_BIT_STRING_get_bit
digital_signature = get_bit(bit_string, 0) == 1
content_commitment = get_bit(bit_string, 1) == 1
key_encipherment = get_bit(bit_string, 2) == 1
data_encipherment = get_bit(bit_string, 3) == 1
key_agreement = get_bit(bit_string, 4) == 1
key_cert_sign = get_bit(bit_string, 5) == 1
crl_sign = get_bit(bit_string, 6) == 1
encipher_only = get_bit(bit_string, 7) == 1
decipher_only = get_bit(bit_string, 8) == 1
return x509.KeyUsage(
digital_signature,
content_commitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only
)
def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
get_bit = backend._lib.ASN1_BIT_STRING_get_bit
digital_signature = get_bit(bit_string, 0) == 1
content_commitment = get_bit(bit_string, 1) == 1
key_encipherment = get_bit(bit_string, 2) == 1
data_encipherment = get_bit(bit_string, 3) == 1
key_agreement = get_bit(bit_string, 4) == 1
key_cert_sign = get_bit(bit_string, 5) == 1
crl_sign = get_bit(bit_string, 6) == 1
encipher_only = get_bit(bit_string, 7) == 1
decipher_only = get_bit(bit_string, 8) == 1
return x509.KeyUsage(
digital_signature,
content_commitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only
)
def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
get_bit = backend._lib.ASN1_BIT_STRING_get_bit
digital_signature = get_bit(bit_string, 0) == 1
content_commitment = get_bit(bit_string, 1) == 1
key_encipherment = get_bit(bit_string, 2) == 1
data_encipherment = get_bit(bit_string, 3) == 1
key_agreement = get_bit(bit_string, 4) == 1
key_cert_sign = get_bit(bit_string, 5) == 1
crl_sign = get_bit(bit_string, 6) == 1
encipher_only = get_bit(bit_string, 7) == 1
decipher_only = get_bit(bit_string, 8) == 1
return x509.KeyUsage(
digital_signature,
content_commitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only
)
def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
get_bit = backend._lib.ASN1_BIT_STRING_get_bit
digital_signature = get_bit(bit_string, 0) == 1
content_commitment = get_bit(bit_string, 1) == 1
key_encipherment = get_bit(bit_string, 2) == 1
data_encipherment = get_bit(bit_string, 3) == 1
key_agreement = get_bit(bit_string, 4) == 1
key_cert_sign = get_bit(bit_string, 5) == 1
crl_sign = get_bit(bit_string, 6) == 1
encipher_only = get_bit(bit_string, 7) == 1
decipher_only = get_bit(bit_string, 8) == 1
return x509.KeyUsage(
digital_signature,
content_commitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only
)
def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
get_bit = backend._lib.ASN1_BIT_STRING_get_bit
digital_signature = get_bit(bit_string, 0) == 1
content_commitment = get_bit(bit_string, 1) == 1
key_encipherment = get_bit(bit_string, 2) == 1
data_encipherment = get_bit(bit_string, 3) == 1
key_agreement = get_bit(bit_string, 4) == 1
key_cert_sign = get_bit(bit_string, 5) == 1
crl_sign = get_bit(bit_string, 6) == 1
encipher_only = get_bit(bit_string, 7) == 1
decipher_only = get_bit(bit_string, 8) == 1
return x509.KeyUsage(
digital_signature,
content_commitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only
)
def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
get_bit = backend._lib.ASN1_BIT_STRING_get_bit
digital_signature = get_bit(bit_string, 0) == 1
content_commitment = get_bit(bit_string, 1) == 1
key_encipherment = get_bit(bit_string, 2) == 1
data_encipherment = get_bit(bit_string, 3) == 1
key_agreement = get_bit(bit_string, 4) == 1
key_cert_sign = get_bit(bit_string, 5) == 1
crl_sign = get_bit(bit_string, 6) == 1
encipher_only = get_bit(bit_string, 7) == 1
decipher_only = get_bit(bit_string, 8) == 1
return x509.KeyUsage(
digital_signature,
content_commitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only
)
def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
get_bit = backend._lib.ASN1_BIT_STRING_get_bit
digital_signature = get_bit(bit_string, 0) == 1
content_commitment = get_bit(bit_string, 1) == 1
key_encipherment = get_bit(bit_string, 2) == 1
data_encipherment = get_bit(bit_string, 3) == 1
key_agreement = get_bit(bit_string, 4) == 1
key_cert_sign = get_bit(bit_string, 5) == 1
crl_sign = get_bit(bit_string, 6) == 1
encipher_only = get_bit(bit_string, 7) == 1
decipher_only = get_bit(bit_string, 8) == 1
return x509.KeyUsage(
digital_signature,
content_commitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only
)
def _decode_key_usage(backend, bit_string):
bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string)
bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free)
get_bit = backend._lib.ASN1_BIT_STRING_get_bit
digital_signature = get_bit(bit_string, 0) == 1
content_commitment = get_bit(bit_string, 1) == 1
key_encipherment = get_bit(bit_string, 2) == 1
data_encipherment = get_bit(bit_string, 3) == 1
key_agreement = get_bit(bit_string, 4) == 1
key_cert_sign = get_bit(bit_string, 5) == 1
crl_sign = get_bit(bit_string, 6) == 1
encipher_only = get_bit(bit_string, 7) == 1
decipher_only = get_bit(bit_string, 8) == 1
return x509.KeyUsage(
digital_signature,
content_commitment,
key_encipherment,
data_encipherment,
key_agreement,
key_cert_sign,
crl_sign,
encipher_only,
decipher_only
)
def check_ca(cert):
"""Check if 'cert' is a proper CA. For this the BasicConstraints need to
identify it as a CA cert and it needs to have the CertSign
(key_cert_sign in Cryptography) KeyUsage flag. Based loosely on
OpenSSL's check_ca()"""
from cryptography import x509
bconst_ca = None
kuse_sign = None
for e in cert.extensions:
if isinstance(e.value, x509.BasicConstraints):
bconst_ca = e.value.ca
elif isinstance(e.value, x509.KeyUsage):
kuse_sign = e.value.key_cert_sign
return kuse_sign is not False and bconst_ca
def create_ca_certificate(cn, key_size=4096, certify_days=365):
key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend())
key_id = x509.SubjectKeyIdentifier.from_public_key(key.public_key())
subject = issuer = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)])
now = datetime.datetime.utcnow()
serial = x509.random_serial_number()
cert = x509.CertificateBuilder() \
.subject_name(subject) \
.issuer_name(issuer) \
.public_key(key.public_key()) \
.serial_number(serial) \
.not_valid_before(now) \
.not_valid_after(now + datetime.timedelta(days=certify_days)) \
.add_extension(key_id, critical=False) \
.add_extension(x509.AuthorityKeyIdentifier(key_id.digest,
[x509.DirectoryName(issuer)],
serial),
critical=False) \
.add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True) \
.add_extension(x509.KeyUsage(digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False),
critical=True) \
.sign(key, hashes.SHA256(), default_backend())
cert = cert.public_bytes(serialization.Encoding.PEM)
key = key.private_bytes(encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
return cert, key
def sign_certificate_request(csr_file, crt_file, ca_crt, ca_pkey):
with open(csr_file, 'rb') as f:
csr = x509.load_pem_x509_csr(data=f.read(), backend=default_backend())
crt = x509.CertificateBuilder().subject_name(
csr.subject
).issuer_name(
ca_crt.subject
).public_key(
csr.public_key()
).serial_number(
uuid.uuid4().int # pylint: disable=no-member
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=365 * 10)
).add_extension(
extension=x509.KeyUsage(
digital_signature=True, key_encipherment=True, content_commitment=True,
data_encipherment=False, key_agreement=False, encipher_only=False, decipher_only=False, key_cert_sign=False, crl_sign=False
),
critical=True
).add_extension(
extension=x509.BasicConstraints(ca=False, path_length=None),
critical=True
).add_extension(
extension=x509.AuthorityKeyIdentifier.from_issuer_public_key(ca_pkey.public_key()),
critical=False
).sign(
private_key=ca_pkey,
algorithm=hashes.SHA256(),
backend=default_backend()
)
with open(crt_file, 'wb') as f:
f.write(crt.public_bytes(encoding=serialization.Encoding.PEM))
def generate_ca_cert(self, cn, ou, o, expire_period=None, password=None):
"""CA ??? ??
Peer ??? ?? ?? ???(ECC Key)
:param cn: ?? CommonName
:param ou: ?? OrganizationalUnitName
:param o: ?? OrganizationName
:param expire_period: ??? ????(year)
:param password: ??? ??? ????(8?? ??)
"""
sign_pri_key = ec.generate_private_key(ec.SECP256K1(), default_backend())
sign_pub_key = sign_pri_key.public_key()
subject_name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, cn),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, ou),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, o),
x509.NameAttribute(NameOID.COUNTRY_NAME, "kr")
])
serial_number = self.__LAST_CA_INDEX + 1
key_usage = x509.KeyUsage(digital_signature=True, content_commitment=False,
key_encipherment=True, data_encipherment=False, key_agreement=False,
key_cert_sign=True, crl_sign=False,
encipher_only=False, decipher_only=False)
if expire_period is None:
expire_period = self.__ca_expired
new_cert = self.__generate_cert(pub_key=sign_pub_key, subject_name=subject_name,
issuer_name=subject_name, serial_number=serial_number,
expire_period=expire_period, key_usage=key_usage,
issuer_priv=sign_pri_key)
cert_pem = new_cert.public_bytes(encoding=serialization.Encoding.PEM)
if password is None:
pri_pem = sign_pri_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
else:
pri_pem = sign_pri_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(password=password)
)
self.__save(self.__CA_PATH, cert_pem, pri_pem)
self.__LAST_CA_INDEX += 1
self.__show_certificate(new_cert)
def generate_peer_cert(self, cn, password=None):
"""Peer ??? ??
???/???? ???(ECC Key), ????? 1?
:param cn: ?? CommonName
:param password: ??? ??? ????(8?? ??)
"""
pri_key = ec.generate_private_key(ec.SECP256K1(), default_backend())
pub_key = pri_key.public_key()
issuer_name = self.__ca_cert.issuer
ou = issuer_name.get_attributes_for_oid(NameOID.ORGANIZATIONAL_UNIT_NAME)[0].value
o = issuer_name.get_attributes_for_oid(NameOID.ORGANIZATION_NAME)[0].value
subject_name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, cn),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, ou),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, o),
x509.NameAttribute(NameOID.COUNTRY_NAME, "kr")
])
expire_period = self.__peer_expired
serial_number = self.__LAST_PEER_INDEX + 1
key_usage = x509.KeyUsage(digital_signature=True, content_commitment=False,
key_encipherment=True, data_encipherment=False, key_agreement=False,
key_cert_sign=False, crl_sign=False,
encipher_only=False, decipher_only=False)
new_cert = self.__generate_cert(pub_key=pub_key, subject_name=subject_name,
issuer_name=issuer_name, serial_number=serial_number,
expire_period=expire_period, key_usage=key_usage,
issuer_priv=self.__ca_pri, issuer_cert=self.__ca_cert)
cert_pem = new_cert.public_bytes(encoding=serialization.Encoding.PEM)
if password is None:
pri_pem = pri_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
else:
pri_pem = pri_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(password=password)
)
ca_cert_pem = self.__ca_cert.public_bytes(encoding=serialization.Encoding.PEM)
peer_path = join(self.__DEFAULT_PATH, cn)
self.__save(peer_path, cert_bytes=cert_pem, pri_bytes=pri_pem, ca_cert=ca_cert_pem)
# ???? ??
self.__load_peer_certificate(cert_bytes=cert_pem)
self.__show_certificate(new_cert)
def issue_certificate(cn, ca_cert, ca_key,
organizations=(),
san_dns=(),
san_ips=(),
key_size=2048,
certify_days=365,
is_web_server=False,
is_web_client=False):
ca_cert = x509.load_pem_x509_certificate(ca_cert, default_backend())
ca_key = serialization.load_pem_private_key(ca_key, password=None, backend=default_backend())
ca_key_id = x509.SubjectKeyIdentifier.from_public_key(ca_key.public_key())
key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend())
subject_name_attributes = [x509.NameAttribute(NameOID.COMMON_NAME, cn)]
subject_name_attributes += [x509.NameAttribute(NameOID.ORGANIZATION_NAME, org) for org in organizations]
subject = x509.Name(subject_name_attributes)
now = datetime.datetime.utcnow()
cert = x509.CertificateBuilder() \
.subject_name(subject) \
.issuer_name(ca_cert.issuer) \
.public_key(key.public_key()) \
.serial_number(x509.random_serial_number()) \
.not_valid_before(now) \
.not_valid_after(now + datetime.timedelta(days=certify_days)) \
.add_extension(x509.AuthorityKeyIdentifier(ca_key_id.digest,
[x509.DirectoryName(ca_cert.issuer)],
ca_cert.serial_number),
critical=False) \
.add_extension(x509.KeyUsage(digital_signature=True,
content_commitment=False,
key_encipherment=True,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False),
critical=True)
extended_usages = []
if is_web_server:
extended_usages.append(ExtendedKeyUsageOID.SERVER_AUTH)
if is_web_client:
extended_usages.append(ExtendedKeyUsageOID.CLIENT_AUTH)
if extended_usages:
cert = cert.add_extension(x509.ExtendedKeyUsage(extended_usages), critical=False)
sans = [x509.DNSName(name) for name in san_dns]
sans += [x509.IPAddress(ipaddress.ip_address(ip)) for ip in san_ips]
if sans:
cert = cert.add_extension(x509.SubjectAlternativeName(sans), critical=False)
cert = cert.sign(ca_key, hashes.SHA256(), default_backend())
cert = cert.public_bytes(serialization.Encoding.PEM)
key = key.private_bytes(encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
return cert, key
def load_or_create_ca_certificate(crt_file, subject, pkey):
""" Load a CA certificate or create a self-signed one """
if os.path.isfile(crt_file):
with open(crt_file, 'rb') as f:
crt = x509.load_pem_x509_certificate(
data=f.read(),
backend=default_backend()
)
else:
issuer = subject
crt = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
pkey.public_key()
).serial_number(
uuid.uuid4().int # pylint: disable=no-member
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=365 * 10)
).add_extension(
extension=x509.KeyUsage(
digital_signature=True, key_encipherment=True, key_cert_sign=True, crl_sign=True, content_commitment=True,
data_encipherment=False, key_agreement=False, encipher_only=False, decipher_only=False
),
critical=True
).add_extension(
extension=x509.BasicConstraints(ca=True, path_length=0),
critical=True
).add_extension(
extension=x509.SubjectKeyIdentifier.from_public_key(pkey.public_key()),
critical=True
).add_extension(
extension=x509.AuthorityKeyIdentifier.from_issuer_public_key(pkey.public_key()),
critical=True
).sign(
private_key=pkey,
algorithm=hashes.SHA256(),
backend=default_backend()
)
with open(crt_file, 'wb') as f:
f.write(crt.public_bytes(encoding=serialization.Encoding.PEM))
return crt
def __check_extensions(self, cert, usages, cur_pathlen):
"""Check whether the critical extensions in this certificate
are supported and allow the provided use(s)."""
try:
exts = cert.extensions
except (ValueError, x509.UnsupportedExtension) as e:
raise api_errors.InvalidCertificateExtensions(
cert, e)
for ext in exts:
etype = type(ext.value)
if etype in SUPPORTED_EXTENSIONS:
keys = EXTENSIONS_VALUES[etype]
if etype == x509.BasicConstraints:
pathlen = ext.value.path_length
if pathlen is not None and \
cur_pathlen > pathlen:
raise api_errors.PathlenTooShort(cert,
cur_pathlen, pathlen)
elif etype == x509.KeyUsage:
keys = list(EXTENSIONS_VALUES[etype])
if not getattr(ext.value,
"key_agreement"):
# Cryptography error:
# encipher_only/decipher_only is
# undefined unless key_agreement
# is true
keys.remove("encipher_only")
keys.remove("decipher_only")
vs = [
key
for key in keys
if getattr(ext.value, key)
]
# For each use, check to see whether it's
# permitted by the certificate's extension
# values.
if etype not in usages:
continue
for u in usages[etype]:
if u not in vs:
raise api_errors.InappropriateCertificateUse(
cert, ext, u, ", ".join(vs))
# If the extension name is unrecognized and critical,
# then the chain cannot be verified.
elif ext.critical:
raise api_errors.UnsupportedCriticalExtension(
cert, ext)