def _create_oids(common_name, oids):
attrs = [x509.NameAttribute(NameOID.COMMON_NAME, common_name)]
for key in CFG.csr.oids:
oid = OIDS_MAP.get(key, None)
if oid:
if key in oids:
value = str(oids[key])
else:
value = str(CFG.csr.oids[key])
attrs += [x509.NameAttribute(oid, value)]
return attrs
python类NameAttribute()的实例源码
def serialize(self,
# password=None,
country=u"US",
state=u"CA",
city=u"San Francisco",
company=u"Lokey Examle",
common_name=u"example.com"):
# This should be handled already
# if not password:
# password = None
key = serialization.load_pem_private_key(
self.to('pem'),
password=None,
backend=default_backend())
subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, country),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, state),
x509.NameAttribute(NameOID.LOCALITY_NAME, city),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, company),
x509.NameAttribute(NameOID.COMMON_NAME, common_name),
])
cert = x509.CertificateSigningRequestBuilder().subject_name(
subject
).sign(key, hashes.SHA256(), default_backend())
return cert.public_bytes(serialization.Encoding.PEM)
def _name(name):
return x509.Name([
x509.NameAttribute(NameOID.ORGANIZATION_NAME,
u"trustme v{}".format(__version__)),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, name),
])
def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
value = _asn1_string_to_utf8(backend, data)
oid = _obj2txt(backend, obj)
return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def gen_certificate(key: rsa.RSAPrivateKey,
common_name: str,
*,
issuer: Optional[str]=None,
sign_key: Optional[rsa.RSAPrivateKey]=None) -> x509.Certificate:
now = datetime.datetime.utcnow()
return x509.CertificateBuilder().subject_name(
x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, common_name),
])
).issuer_name(
x509.Name([
x509.NameAttribute(
NameOID.COMMON_NAME,
issuer or common_name
)
])
).not_valid_before(
now
).not_valid_after(
now + datetime.timedelta(seconds=86400)
).serial_number(
x509.random_serial_number()
).public_key(
key.public_key()
).add_extension(
x509.BasicConstraints(ca=True, path_length=0), critical=True
).sign(
private_key=sign_key or key,
algorithm=hashes.SHA256(),
backend=BACKEND
)
def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
value = _asn1_string_to_utf8(backend, data)
oid = _obj2txt(backend, obj)
return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def MakeCASignedCert(common_name,
private_key,
ca_cert,
ca_private_key,
serial_number=2,
session=None):
"""Make a cert and sign it with the CA's private key."""
public_key = private_key.public_key()
builder = x509.CertificateBuilder()
builder = builder.issuer_name(ca_cert.get_issuer())
subject = x509.Name([
x509.NameAttribute(oid.NameOID.COMMON_NAME, common_name)
])
builder = builder.subject_name(subject)
valid_from = time.time() - 60 * 60 * 24
valid_until = time.time() + 60 * 60 * 24 * 365 * 10
builder = builder.not_valid_before(datetime.datetime.fromtimestamp(
valid_from))
builder = builder.not_valid_after(datetime.datetime.fromtimestamp(
valid_until))
builder = builder.serial_number(serial_number)
builder = builder.public_key(public_key.get_raw_key())
builder = builder.add_extension(
x509.BasicConstraints(
ca=False, path_length=None), critical=True)
certificate = builder.sign(
private_key=ca_private_key.get_raw_key(),
algorithm=hashes.SHA256(),
backend=openssl.backend)
return X509Ceritifcate(session=session).from_raw_key(certificate)
def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
value = _asn1_string_to_utf8(backend, data)
oid = _obj2txt(backend, obj)
type = _ASN1_TYPE_TO_ENUM[data.type]
return x509.NameAttribute(x509.ObjectIdentifier(oid), value, type)
def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
value = _asn1_string_to_utf8(backend, data)
oid = _obj2txt(backend, obj)
return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def _generate_ca_cert(self):
"""
Generate a CA cert/key.
"""
if self._ca_key is None:
self._ca_key = generate_private_key(u'rsa')
self._ca_name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'ACME Snake Oil CA')])
self._ca_cert = (
x509.CertificateBuilder()
.subject_name(self._ca_name)
.issuer_name(self._ca_name)
.not_valid_before(self._now() - timedelta(seconds=3600))
.not_valid_after(self._now() + timedelta(days=3650))
.public_key(self._ca_key.public_key())
.serial_number(int(uuid4()))
.add_extension(
x509.BasicConstraints(ca=True, path_length=0),
critical=True)
.add_extension(
x509.SubjectKeyIdentifier.from_public_key(
self._ca_key.public_key()),
critical=False)
.sign(
private_key=self._ca_key,
algorithm=hashes.SHA256(),
backend=default_backend()))
self._ca_aki = x509.AuthorityKeyIdentifier.from_issuer_public_key(
self._ca_key.public_key())
def test_common_name_too_long(self):
"""
If the first name provided is too long, `~txacme.util.csr_for_names`
uses a dummy value for the common name.
"""
self.assertThat(
csr_for_names([u'aaaa.' * 16], RSA_KEY_512_RAW),
MatchesStructure(
subject=Equals(x509.Name([
x509.NameAttribute(
NameOID.COMMON_NAME,
u'san.too.long.invalid')]))))
def generate_tls_sni_01_cert(server_name, key_type=u'rsa',
_generate_private_key=None):
"""
Generate a certificate/key pair for responding to a tls-sni-01 challenge.
:param str server_name: The SAN the certificate should have.
:param str key_type: The type of key to generate; usually not necessary.
:rtype: ``Tuple[`~cryptography.x509.Certificate`, PrivateKey]``
:return: A tuple of the certificate and private key.
"""
key = (_generate_private_key or generate_private_key)(key_type)
name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'acme.invalid')])
cert = (
x509.CertificateBuilder()
.subject_name(name)
.issuer_name(name)
.not_valid_before(datetime.now() - timedelta(seconds=3600))
.not_valid_after(datetime.now() + timedelta(seconds=3600))
.serial_number(int(uuid.uuid4()))
.public_key(key.public_key())
.add_extension(
x509.SubjectAlternativeName([x509.DNSName(server_name)]),
critical=False)
.sign(
private_key=key,
algorithm=hashes.SHA256(),
backend=default_backend())
)
return (cert, key)
def csr_for_names(names, key):
"""
Generate a certificate signing request for the given names and private key.
.. seealso:: `acme.client.Client.request_issuance`
.. seealso:: `generate_private_key`
:param ``List[str]``: One or more names (subjectAltName) for which to
request a certificate.
:param key: A Cryptography private key object.
:rtype: `cryptography.x509.CertificateSigningRequest`
:return: The certificate request message.
"""
if len(names) == 0:
raise ValueError('Must have at least one name')
if len(names[0]) > 64:
common_name = u'san.too.long.invalid'
else:
common_name = names[0]
return (
x509.CertificateSigningRequestBuilder()
.subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, common_name)]))
.add_extension(
x509.SubjectAlternativeName(list(map(x509.DNSName, names))),
critical=False)
.sign(key, hashes.SHA256(), default_backend()))
def _attribDict2x509List(attribs):
attribs = attribs or OrderedDict()
attrib_list = []
for nice_name, values in attribs.items():
oid = OID_MAPPING[nice_name]
if not isinstance(values, list):
values = [values]
for single_value in values:
attrib_list.append(x509.NameAttribute(oid, single_value))
return attrib_list
def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
value = _asn1_string_to_utf8(backend, data)
oid = _obj2txt(backend, obj)
return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def build_crl():
#from cryptography import x509
# from cryptography.hazmat.backends import default_backend
#from cryptography.hazmat.primitives import hashes
# from cryptography.hazmat.primitives.asymmetric import rsa
#from cryptography.x509.oid import NameOID
#import datetime
ca=get_newest_ca()
one_day = datetime.timedelta(1, 0, 0)
builder = x509.CertificateRevocationListBuilder()
builder = builder.issuer_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME,ca.common_name),
]))
builder = builder.last_update(datetime.datetime.today())
builder = builder.next_update(datetime.datetime.today() + one_day)
revoked_list=Certificate.objects.filter(issuer_serial_number=ca.serial_number,revoked=True)
for revoked_cert in revoked_list:
logger.debug("revoked serial_number: %s",revoked_cert.serial_number)
revoked_cert = x509.RevokedCertificateBuilder().serial_number(int(revoked_cert.serial_number)
).revocation_date(
datetime.datetime.today()
).build(default_backend())
builder = builder.add_revoked_certificate(revoked_cert)
crl = builder.sign(
private_key=loadPEMKey(keyStorePath(ca.serial_number)), algorithm=hashes.SHA256(),
backend=default_backend()
)
dataStream=crl.public_bytes(serialization.Encoding.PEM)
return dataStream
def create_csr(key, domains):
"""
Creates a CSR in DER format for the specified key and domain names.
"""
assert domains
name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, domains[0]),
])
san = x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains])
csr = x509.CertificateSigningRequestBuilder().subject_name(name) \
.add_extension(san, critical=False) \
.sign(key, hashes.SHA256(), default_backend())
return export_csr_for_acme(csr)
def keygen(pub_key, priv_key, user_id, priv_key_password=None):
"""Generate new private key and certificate RSA_SHA512_4096"""
# Generate our key
key = rsa.generate_private_key(public_exponent=65537, key_size=4096,
backend=default_backend())
if priv_key_password:
ea = serialization.BestAvailableEncryption(priv_key_password)
else:
ea = serialization.NoEncryption()
# Write our key to disk for safe keeping
with open(priv_key, "wb") as f:
f.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=ea,
))
# Various details about who we are. For a self-signed certificate the
# subject and issuer are always the same.
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "XX"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "XX"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "XX"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "I2P Anonymous Network"),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "I2P"),
x509.NameAttribute(NameOID.COMMON_NAME, user_id),
])
cert = x509.CertificateBuilder() \
.subject_name(subject) \
.issuer_name(issuer) \
.public_key(key.public_key()) \
.not_valid_before(datetime.datetime.utcnow()) \
.not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=365*10)
) \
.serial_number(random.randrange(1000000000, 2000000000)) \
.add_extension(
x509.SubjectKeyIdentifier.from_public_key(key.public_key()),
critical=False,
).sign(key, hashes.SHA512(), default_backend())
with open(pub_key, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
def create_proxy_cert(loaded_cert, loaded_private_key,
loaded_public_key, lifetime_hours):
"""
Given cryptography objects for an issuing certificate, a public_key,
a private_key, and an int for lifetime in hours, creates a proxy
cert from the issuer and public key signed by the private key.
"""
builder = x509.CertificateBuilder()
# create a serial number for the new proxy
# Under RFC 3820 there are many ways to generate the serial number. However
# making the number unpredictable has security benefits, e.g. it can make
# this style of attack more difficult:
# http://www.win.tue.nl/hashclash/rogue-ca
serial = struct.unpack("<Q", os.urandom(8))[0]
builder = builder.serial_number(serial)
# set the new proxy as valid from now until lifetime_hours have passed
builder = builder.not_valid_before(datetime.datetime.utcnow())
builder = builder.not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(hours=lifetime_hours))
# set the public key of the new proxy to the given public key
builder = builder.public_key(loaded_public_key)
# set the issuer of the new cert to the subject of the issuing cert
builder = builder.issuer_name(loaded_cert.subject)
# set the new proxy's subject
# append a CommonName to the new proxy's subject
# with the serial as the value of the CN
new_atribute = x509.NameAttribute(
x509.oid.NameOID.COMMON_NAME, six.u(str(serial)))
subject_attributes = list(loaded_cert.subject)
subject_attributes.append(new_atribute)
builder = builder.subject_name(x509.Name(subject_attributes))
# add proxyCertInfo extension to the new proxy (We opt not to add keyUsage)
# For RFC proxies the effective usage is defined as the intersection
# of the usage of each cert in the chain. See section 4.2 of RFC 3820.
# the constants 'oid' and 'value' are gotten from
# examining output from a call to the open ssl function:
# X509V3_EXT_conf(NULL, ctx, name, value)
# ctx set by X509V3_set_nconf(&ctx, NCONF_new(NULL))
# name = "proxyCertInfo"
# value = "critical,language:Inherit all"
oid = x509.ObjectIdentifier("1.3.6.1.5.5.7.1.14")
value = b"0\x0c0\n\x06\x08+\x06\x01\x05\x05\x07\x15\x01"
extension = x509.extensions.UnrecognizedExtension(oid, value)
builder = builder.add_extension(extension, critical=True)
# sign the new proxy with the issuer's private key
new_certificate = builder.sign(
private_key=loaded_private_key, algorithm=hashes.SHA256(),
backend=default_backend())
# return the new proxy as a cryptography object
return new_certificate
def generate_ca_cert(self, cn, ou, o, expire_period=None, password=None):
"""CA ??? ??
Peer ??? ?? ?? ???(ECC Key)
:param cn: ?? CommonName
:param ou: ?? OrganizationalUnitName
:param o: ?? OrganizationName
:param expire_period: ??? ????(year)
:param password: ??? ??? ????(8?? ??)
"""
sign_pri_key = ec.generate_private_key(ec.SECP256K1(), default_backend())
sign_pub_key = sign_pri_key.public_key()
subject_name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, cn),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, ou),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, o),
x509.NameAttribute(NameOID.COUNTRY_NAME, "kr")
])
serial_number = self.__LAST_CA_INDEX + 1
key_usage = x509.KeyUsage(digital_signature=True, content_commitment=False,
key_encipherment=True, data_encipherment=False, key_agreement=False,
key_cert_sign=True, crl_sign=False,
encipher_only=False, decipher_only=False)
if expire_period is None:
expire_period = self.__ca_expired
new_cert = self.__generate_cert(pub_key=sign_pub_key, subject_name=subject_name,
issuer_name=subject_name, serial_number=serial_number,
expire_period=expire_period, key_usage=key_usage,
issuer_priv=sign_pri_key)
cert_pem = new_cert.public_bytes(encoding=serialization.Encoding.PEM)
if password is None:
pri_pem = sign_pri_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
else:
pri_pem = sign_pri_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(password=password)
)
self.__save(self.__CA_PATH, cert_pem, pri_pem)
self.__LAST_CA_INDEX += 1
self.__show_certificate(new_cert)