def enroll(self, enrollment_id, enrollment_secret):
"""Enroll a registered user in order to receive a signed X509 certificate
Args:
enrollment_id (str): The registered ID to use for enrollment
enrollment_secret (str): The secret associated with the
enrollment ID
Returns: PEM-encoded X509 certificate
Raises:
RequestException: errors in requests.exceptions
ValueError: Failed response, json parse error, args missing
"""
private_key = self._crypto.generate_private_key()
csr = self._crypto.generate_csr(private_key, x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, six.u(enrollment_id))]))
cert = self._ca_client.enroll(
enrollment_id, enrollment_secret,
csr.public_bytes(Encoding.PEM).decode("utf-8"))
return Enrollment(private_key, cert)
python类NameAttribute()的实例源码
def generate_cert(key_path, cert_out_path):
private_key = load_private_key_file(key_path)
public_key = private_key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(x509.OID_COMMON_NAME, u'PrivCount User'),
]))
builder = builder.issuer_name(x509.Name([
x509.NameAttribute(x509.OID_COMMON_NAME, u'PrivCount Authority'),
]))
builder = builder.not_valid_before(datetime.datetime.today() - datetime.timedelta(days=1))
builder = builder.not_valid_after(datetime.datetime(2020, 1, 1))
builder = builder.serial_number(int(uuid.uuid4()))
builder = builder.public_key(public_key)
builder = builder.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
certificate = builder.sign(private_key=private_key, algorithm=hashes.SHA256(), backend=default_backend())
with open(cert_out_path, 'wb') as outf:
print >>outf, certificate.public_bytes(encoding=serialization.Encoding.PEM)
def _build_x509_name(self, x509_name):
count = self._backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = self._backend._lib.X509_NAME_get_entry(x509_name, x)
obj = self._backend._lib.X509_NAME_ENTRY_get_object(entry)
assert obj != self._backend._ffi.NULL
data = self._backend._lib.X509_NAME_ENTRY_get_data(entry)
assert data != self._backend._ffi.NULL
buf = self._backend._ffi.new("unsigned char **")
res = self._backend._lib.ASN1_STRING_to_UTF8(buf, data)
assert res >= 0
assert buf[0] != self._backend._ffi.NULL
buf = self._backend._ffi.gc(
buf, lambda buf: self._backend._lib.OPENSSL_free(buf[0])
)
value = self._backend._ffi.buffer(buf[0], res)[:].decode('utf8')
oid = self._obj2txt(obj)
attributes.append(
x509.NameAttribute(
x509.ObjectIdentifier(oid), value
)
)
return x509.Name(attributes)
def _build_x509_name(self, x509_name):
count = self._backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = self._backend._lib.X509_NAME_get_entry(x509_name, x)
obj = self._backend._lib.X509_NAME_ENTRY_get_object(entry)
assert obj != self._backend._ffi.NULL
data = self._backend._lib.X509_NAME_ENTRY_get_data(entry)
assert data != self._backend._ffi.NULL
buf = self._backend._ffi.new("unsigned char **")
res = self._backend._lib.ASN1_STRING_to_UTF8(buf, data)
assert res >= 0
assert buf[0] != self._backend._ffi.NULL
buf = self._backend._ffi.gc(
buf, lambda buf: self._backend._lib.OPENSSL_free(buf[0])
)
value = self._backend._ffi.buffer(buf[0], res)[:].decode('utf8')
oid = self._obj2txt(obj)
attributes.append(
x509.NameAttribute(
x509.ObjectIdentifier(oid), value
)
)
return x509.Name(attributes)
def create_csr(key, domains, must_staple=False):
"""
Creates a CSR in DER format for the specified key and domain names.
"""
assert domains
name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, domains[0]),
])
san = x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains])
csr = x509.CertificateSigningRequestBuilder().subject_name(name) \
.add_extension(san, critical=False)
if must_staple:
ocsp_must_staple = x509.TLSFeature(features=[x509.TLSFeatureType.status_request])
csr = csr.add_extension(ocsp_must_staple, critical=False)
csr = csr.sign(key, hashes.SHA256(), default_backend())
return export_csr_for_acme(csr)
def _build_x509_name(self, x509_name):
count = self._backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = self._backend._lib.X509_NAME_get_entry(x509_name, x)
obj = self._backend._lib.X509_NAME_ENTRY_get_object(entry)
assert obj != self._backend._ffi.NULL
data = self._backend._lib.X509_NAME_ENTRY_get_data(entry)
assert data != self._backend._ffi.NULL
buf = self._backend._ffi.new("unsigned char **")
res = self._backend._lib.ASN1_STRING_to_UTF8(buf, data)
assert res >= 0
assert buf[0] != self._backend._ffi.NULL
buf = self._backend._ffi.gc(
buf, lambda buf: self._backend._lib.OPENSSL_free(buf[0])
)
value = self._backend._ffi.buffer(buf[0], res)[:].decode('utf8')
oid = self._obj2txt(obj)
attributes.append(
x509.NameAttribute(
x509.ObjectIdentifier(oid), value
)
)
return x509.Name(attributes)
def _build_x509_name(self, x509_name):
count = self._backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = self._backend._lib.X509_NAME_get_entry(x509_name, x)
obj = self._backend._lib.X509_NAME_ENTRY_get_object(entry)
assert obj != self._backend._ffi.NULL
data = self._backend._lib.X509_NAME_ENTRY_get_data(entry)
assert data != self._backend._ffi.NULL
buf = self._backend._ffi.new("unsigned char **")
res = self._backend._lib.ASN1_STRING_to_UTF8(buf, data)
assert res >= 0
assert buf[0] != self._backend._ffi.NULL
buf = self._backend._ffi.gc(
buf, lambda buf: self._backend._lib.OPENSSL_free(buf[0])
)
value = self._backend._ffi.buffer(buf[0], res)[:].decode('utf8')
oid = self._obj2txt(obj)
attributes.append(
x509.NameAttribute(
x509.ObjectIdentifier(oid), value
)
)
return x509.Name(attributes)
def _create_self_signed_certificate(self, private_key):
issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"My Company"),
x509.NameAttribute(NameOID.COMMON_NAME, u"Test Certificate"),
])
cert_builder = x509.CertificateBuilder(
issuer_name=issuer, subject_name=issuer,
public_key=private_key.public_key(),
serial_number=x509.random_serial_number(),
not_valid_before=datetime.utcnow(),
not_valid_after=datetime.utcnow() + timedelta(days=10)
)
cert = cert_builder.sign(private_key,
hashes.SHA256(),
default_backend())
return cert
def _check_or_add_cert(self, name, domain, key, authorization):
if 'certificate' in domain:
return domain['certificate']
self._log('domain:{}: generating CSR...', name)
builder = x509.CertificateSigningRequestBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name),
]))
csr = builder.sign(key, hashes.SHA256(), backend)
self._log('domain:{}: done', name)
self._log('domain:{}: requesting certificate...', name)
certificate = self._client.new_certificate(self._key, csr)
domain['certificate'] = certificate
self._write_config()
self._log('domain:{}: done: {}', name, certificate)
return certificate
def generate_wildcard_pem_bytes():
"""
Generate a wildcard (subject name '*') self-signed certificate valid for
10 years.
https://cryptography.io/en/latest/x509/tutorial/#creating-a-self-signed-certificate
:return: Bytes representation of the PEM certificate data
"""
key = generate_private_key(u'rsa')
name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u'*')])
cert = (
x509.CertificateBuilder()
.issuer_name(name)
.subject_name(name)
.not_valid_before(datetime.today() - timedelta(days=1))
.not_valid_after(datetime.now() + timedelta(days=3650))
.serial_number(int(uuid.uuid4()))
.public_key(key.public_key())
.sign(
private_key=key,
algorithm=hashes.SHA256(),
backend=default_backend())
)
return b''.join((
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()),
cert.public_bytes(serialization.Encoding.PEM)
))
def test_ecies_generate_csr(self):
"""Test case for generate certificate signing request."""
ecies256 = ecies()
private_key = ecies256.generate_private_key()
csr = ecies256.generate_csr(private_key, x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, u"test")]))
csr_pem = csr.public_bytes(Encoding.PEM)
self.assertTrue(csr_pem.startswith(
b"-----BEGIN CERTIFICATE REQUEST-----"))
def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
value = _asn1_string_to_utf8(backend, data)
oid = _obj2txt(backend, obj)
return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
value = _asn1_string_to_utf8(backend, data)
oid = _obj2txt(backend, obj)
return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def generate_self_signed_cert(cert_file, key_file):
# type: (Any, Any) -> None
"""Given two file-like objects, generate an SSL key and certificate
Args:
cert_file: The certificate file you wish to write to
key_file: The key file you wish to write to
"""
one_day = timedelta(1, 0, 0)
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend())
public_key = private_key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(
x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'cryptography.io'),
]))
builder = builder.issuer_name(
x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'cryptography.io'),
]))
builder = builder.not_valid_before(datetime.today() - one_day)
builder = builder.not_valid_after(datetime.today() + timedelta(365 * 10))
builder = builder.serial_number(uuid4().int)
builder = builder.public_key(public_key)
builder = builder.add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True, )
certificate = builder.sign(
private_key=private_key,
algorithm=hashes.SHA256(),
backend=default_backend())
key_file.write(
private_key.private_bytes(
Encoding.PEM, PrivateFormat.TraditionalOpenSSL, NoEncryption()))
cert_file.write(certificate.public_bytes(Encoding.PEM))
def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
value = _asn1_string_to_utf8(backend, data)
oid = _obj2txt(backend, obj)
return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
value = _asn1_string_to_utf8(backend, data)
oid = _obj2txt(backend, obj)
return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
value = _asn1_string_to_utf8(backend, data)
oid = _obj2txt(backend, obj)
return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
value = _asn1_string_to_utf8(backend, data)
oid = _obj2txt(backend, obj)
return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
value = _asn1_string_to_utf8(backend, data)
oid = _obj2txt(backend, obj)
return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def set_csr_if_blank(self):
if not self.csr:
private_key = self.get_key()
builder = x509.CertificateSigningRequestBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, self.get_common_name()),
x509.NameAttribute(NameOID.COUNTRY_NAME, u'{}'.format(self.account.country)),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'{}'.format(self.account.state)),
x509.NameAttribute(NameOID.LOCALITY_NAME, u'{}'.format(self.account.locality)),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'{}'.format(self.account.organization_name)),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'{}'.format(self.account.organizational_unit_name)),
]))
builder = builder.add_extension(x509.SubjectAlternativeName(self.get_san_entries()), critical=False)
csr = builder.sign(private_key, hashes.SHA256(), default_backend())
self.csr = csr.public_bytes(serialization.Encoding.PEM)
def ca_file(tmpdir):
"""
Create a valid PEM file with CA certificates and return the path.
"""
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
public_key = key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org"),
]))
builder = builder.issuer_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org"),
]))
one_day = datetime.timedelta(1, 0, 0)
builder = builder.not_valid_before(datetime.datetime.today() - one_day)
builder = builder.not_valid_after(datetime.datetime.today() + one_day)
builder = builder.serial_number(int(uuid.uuid4()))
builder = builder.public_key(public_key)
builder = builder.add_extension(
x509.BasicConstraints(ca=True, path_length=None), critical=True,
)
certificate = builder.sign(
private_key=key, algorithm=hashes.SHA256(),
backend=default_backend()
)
ca_file = tmpdir.join("test.pem")
ca_file.write_binary(
certificate.public_bytes(
encoding=serialization.Encoding.PEM,
)
)
return str(ca_file).encode("ascii")
def create_ca_certificate(cn, key_size=4096, certify_days=365):
key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend())
key_id = x509.SubjectKeyIdentifier.from_public_key(key.public_key())
subject = issuer = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)])
now = datetime.datetime.utcnow()
serial = x509.random_serial_number()
cert = x509.CertificateBuilder() \
.subject_name(subject) \
.issuer_name(issuer) \
.public_key(key.public_key()) \
.serial_number(serial) \
.not_valid_before(now) \
.not_valid_after(now + datetime.timedelta(days=certify_days)) \
.add_extension(key_id, critical=False) \
.add_extension(x509.AuthorityKeyIdentifier(key_id.digest,
[x509.DirectoryName(issuer)],
serial),
critical=False) \
.add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True) \
.add_extension(x509.KeyUsage(digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False),
critical=True) \
.sign(key, hashes.SHA256(), default_backend())
cert = cert.public_bytes(serialization.Encoding.PEM)
key = key.private_bytes(encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
return cert, key
def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
value = backend._asn1_string_to_utf8(data)
oid = _obj2txt(backend, obj)
return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def dict_to_x509_name(data):
name_attributes = []
attr_name_oid = {
'commonName': x509.NameOID.COMMON_NAME,
'countryName': x509.NameOID.COUNTRY_NAME,
'stateOrProvinceName': x509.NameOID.STATE_OR_PROVINCE_NAME,
'locality': x509.NameOID.LOCALITY_NAME,
'organizationName': x509.NameOID.ORGANIZATION_NAME,
'organizationalUnitName': x509.NameOID.ORGANIZATIONAL_UNIT_NAME,
}
for key, value in data.items():
if not key in attr_name_oid:
raise ValueError('{} is not a supported x509 name attribute'.format(key))
name_attributes.append(x509.NameAttribute(attr_name_oid[key], value))
return x509.Name(name_attributes)
def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
value = _asn1_string_to_utf8(backend, data)
oid = _obj2txt(backend, obj)
return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def _decode_x509_name_entry(backend, x509_name_entry):
obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry)
backend.openssl_assert(obj != backend._ffi.NULL)
data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry)
backend.openssl_assert(data != backend._ffi.NULL)
value = _asn1_string_to_utf8(backend, data)
oid = _obj2txt(backend, obj)
return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def makeSignedCert(cpub, ccn, cvdays, cserial, spriv, scert=None):
"""
Creates a certificate for a given public key and signs it with a given
certificate and private key. It will reuse the subject of the signing
certificate as the subject of the new certificate, only replacing the
common name with the one given as parameter, if a signing certificate is
specified, otherwise it will just use the given common name as subject
and issuer.
:param cpub: Public key for which to create a certificate.
:param ccn: Common name for the new certificate.
:param cvdays: Number of days the new certificate is valid.
:param cserial: The serial number for the new certificate as an int.
:param spriv: Private key for the signing certificate.
:param scert: Certificate used to sign the new certificate, or None if
no certificate is used.
:return: The new certificate as an object.
"""
if scert:
sname = x509.Name(
[ p for p in scert.subject if p.oid != NameOID.COMMON_NAME ]
+ [ x509.NameAttribute(NameOID.COMMON_NAME, ccn) ])
iname = scert.subject
else:
sname = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, ccn)])
iname = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, ccn)])
builder = x509.CertificateBuilder()
builder = builder.subject_name(sname)
builder = builder.issuer_name(iname)
builder = builder.not_valid_before(datetime.datetime.today())
builder = builder.not_valid_after(datetime.datetime.today() +
datetime.timedelta(cvdays, 0, 0))
builder = builder.serial_number(cserial)
builder = builder.public_key(cpub)
builder = builder.add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True
)
return builder.sign(private_key=spriv, algorithm=hashes.SHA256(),
backend=default_backend())
def _create_oids(common_name, oids):
attrs = [x509.NameAttribute(NameOID.COMMON_NAME, common_name)]
for key in CFG.csr.oids:
oid = OIDS_MAP.get(key, None)
if oid:
if key in oids:
value = str(oids[key])
else:
value = str(CFG.csr.oids[key])
attrs += [x509.NameAttribute(oid, value)]
return attrs
def _create_oids(common_name, oids):
attrs = [x509.NameAttribute(NameOID.COMMON_NAME, common_name)]
for key in CFG.csr.oids:
oid = OIDS_MAP.get(key, None)
if oid:
if key in oids:
value = str(oids[key])
else:
value = str(CFG.csr.oids[key])
attrs += [x509.NameAttribute(oid, value)]
return attrs
def _create_oids(common_name, oids):
attrs = [x509.NameAttribute(NameOID.COMMON_NAME, common_name)]
for key in CFG.csr.oids:
oid = OIDS_MAP.get(key, None)
if oid:
if key in oids:
value = str(oids[key])
else:
value = str(CFG.csr.oids[key])
attrs += [x509.NameAttribute(oid, value)]
return attrs