def enroll(self, enrollment_id, enrollment_secret):
"""Enroll a registered user in order to receive a signed X509 certificate
Args:
enrollment_id (str): The registered ID to use for enrollment
enrollment_secret (str): The secret associated with the
enrollment ID
Returns: PEM-encoded X509 certificate
Raises:
RequestException: errors in requests.exceptions
ValueError: Failed response, json parse error, args missing
"""
private_key = self._crypto.generate_private_key()
csr = self._crypto.generate_csr(private_key, x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, six.u(enrollment_id))]))
cert = self._ca_client.enroll(
enrollment_id, enrollment_secret,
csr.public_bytes(Encoding.PEM).decode("utf-8"))
return Enrollment(private_key, cert)
python类Name()的实例源码
def generate_cert(key_path, cert_out_path):
private_key = load_private_key_file(key_path)
public_key = private_key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(x509.OID_COMMON_NAME, u'PrivCount User'),
]))
builder = builder.issuer_name(x509.Name([
x509.NameAttribute(x509.OID_COMMON_NAME, u'PrivCount Authority'),
]))
builder = builder.not_valid_before(datetime.datetime.today() - datetime.timedelta(days=1))
builder = builder.not_valid_after(datetime.datetime(2020, 1, 1))
builder = builder.serial_number(int(uuid.uuid4()))
builder = builder.public_key(public_key)
builder = builder.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)
certificate = builder.sign(private_key=private_key, algorithm=hashes.SHA256(), backend=default_backend())
with open(cert_out_path, 'wb') as outf:
print >>outf, certificate.public_bytes(encoding=serialization.Encoding.PEM)
def _decode_x509_name(backend, x509_name):
count = backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
prev_set_id = -1
for x in range(count):
entry = backend._lib.X509_NAME_get_entry(x509_name, x)
attribute = _decode_x509_name_entry(backend, entry)
set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry)
if set_id != prev_set_id:
attributes.append(set([attribute]))
else:
# is in the same RDN a previous entry
attributes[-1].add(attribute)
prev_set_id = set_id
return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes)
def _build_x509_name(self, x509_name):
count = self._backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = self._backend._lib.X509_NAME_get_entry(x509_name, x)
obj = self._backend._lib.X509_NAME_ENTRY_get_object(entry)
assert obj != self._backend._ffi.NULL
data = self._backend._lib.X509_NAME_ENTRY_get_data(entry)
assert data != self._backend._ffi.NULL
buf = self._backend._ffi.new("unsigned char **")
res = self._backend._lib.ASN1_STRING_to_UTF8(buf, data)
assert res >= 0
assert buf[0] != self._backend._ffi.NULL
buf = self._backend._ffi.gc(
buf, lambda buf: self._backend._lib.OPENSSL_free(buf[0])
)
value = self._backend._ffi.buffer(buf[0], res)[:].decode('utf8')
oid = self._obj2txt(obj)
attributes.append(
x509.NameAttribute(
x509.ObjectIdentifier(oid), value
)
)
return x509.Name(attributes)
def _build_x509_name(self, x509_name):
count = self._backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = self._backend._lib.X509_NAME_get_entry(x509_name, x)
obj = self._backend._lib.X509_NAME_ENTRY_get_object(entry)
assert obj != self._backend._ffi.NULL
data = self._backend._lib.X509_NAME_ENTRY_get_data(entry)
assert data != self._backend._ffi.NULL
buf = self._backend._ffi.new("unsigned char **")
res = self._backend._lib.ASN1_STRING_to_UTF8(buf, data)
assert res >= 0
assert buf[0] != self._backend._ffi.NULL
buf = self._backend._ffi.gc(
buf, lambda buf: self._backend._lib.OPENSSL_free(buf[0])
)
value = self._backend._ffi.buffer(buf[0], res)[:].decode('utf8')
oid = self._obj2txt(obj)
attributes.append(
x509.NameAttribute(
x509.ObjectIdentifier(oid), value
)
)
return x509.Name(attributes)
def create_csr(key, domains, must_staple=False):
"""
Creates a CSR in DER format for the specified key and domain names.
"""
assert domains
name = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, domains[0]),
])
san = x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains])
csr = x509.CertificateSigningRequestBuilder().subject_name(name) \
.add_extension(san, critical=False)
if must_staple:
ocsp_must_staple = x509.TLSFeature(features=[x509.TLSFeatureType.status_request])
csr = csr.add_extension(ocsp_must_staple, critical=False)
csr = csr.sign(key, hashes.SHA256(), default_backend())
return export_csr_for_acme(csr)
def _build_x509_name(self, x509_name):
count = self._backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = self._backend._lib.X509_NAME_get_entry(x509_name, x)
obj = self._backend._lib.X509_NAME_ENTRY_get_object(entry)
assert obj != self._backend._ffi.NULL
data = self._backend._lib.X509_NAME_ENTRY_get_data(entry)
assert data != self._backend._ffi.NULL
buf = self._backend._ffi.new("unsigned char **")
res = self._backend._lib.ASN1_STRING_to_UTF8(buf, data)
assert res >= 0
assert buf[0] != self._backend._ffi.NULL
buf = self._backend._ffi.gc(
buf, lambda buf: self._backend._lib.OPENSSL_free(buf[0])
)
value = self._backend._ffi.buffer(buf[0], res)[:].decode('utf8')
oid = self._obj2txt(obj)
attributes.append(
x509.NameAttribute(
x509.ObjectIdentifier(oid), value
)
)
return x509.Name(attributes)
def test_hostnameIsIndicated(self):
"""
Specifying the C{hostname} argument to L{CertificateOptions} also sets
the U{Server Name Extension
<https://en.wikipedia.org/wiki/Server_Name_Indication>} TLS indication
field to the correct value.
"""
names = []
def setupServerContext(ctx):
def servername_received(conn):
names.append(conn.get_servername().decode("ascii"))
ctx.set_tlsext_servername_callback(servername_received)
cProto, sProto, pump = self.serviceIdentitySetup(
u"valid.example.com",
u"valid.example.com",
setupServerContext
)
self.assertEqual(names, [u"valid.example.com"])
def _decode_x509_name(backend, x509_name):
count = backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
prev_set_id = -1
for x in range(count):
entry = backend._lib.X509_NAME_get_entry(x509_name, x)
attribute = _decode_x509_name_entry(backend, entry)
set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry)
if set_id != prev_set_id:
attributes.append(set([attribute]))
else:
# is in the same RDN a previous entry
attributes[-1].add(attribute)
prev_set_id = set_id
return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes)
def _decode_x509_name(backend, x509_name):
count = backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
prev_set_id = -1
for x in range(count):
entry = backend._lib.X509_NAME_get_entry(x509_name, x)
attribute = _decode_x509_name_entry(backend, entry)
set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry)
if set_id != prev_set_id:
attributes.append(set([attribute]))
else:
# is in the same RDN a previous entry
attributes[-1].add(attribute)
prev_set_id = set_id
return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes)
def _build_x509_name(self, x509_name):
count = self._backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = self._backend._lib.X509_NAME_get_entry(x509_name, x)
obj = self._backend._lib.X509_NAME_ENTRY_get_object(entry)
assert obj != self._backend._ffi.NULL
data = self._backend._lib.X509_NAME_ENTRY_get_data(entry)
assert data != self._backend._ffi.NULL
buf = self._backend._ffi.new("unsigned char **")
res = self._backend._lib.ASN1_STRING_to_UTF8(buf, data)
assert res >= 0
assert buf[0] != self._backend._ffi.NULL
buf = self._backend._ffi.gc(
buf, lambda buf: self._backend._lib.OPENSSL_free(buf[0])
)
value = self._backend._ffi.buffer(buf[0], res)[:].decode('utf8')
oid = self._obj2txt(obj)
attributes.append(
x509.NameAttribute(
x509.ObjectIdentifier(oid), value
)
)
return x509.Name(attributes)
def _create_self_signed_certificate(self, private_key):
issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"My Company"),
x509.NameAttribute(NameOID.COMMON_NAME, u"Test Certificate"),
])
cert_builder = x509.CertificateBuilder(
issuer_name=issuer, subject_name=issuer,
public_key=private_key.public_key(),
serial_number=x509.random_serial_number(),
not_valid_before=datetime.utcnow(),
not_valid_after=datetime.utcnow() + timedelta(days=10)
)
cert = cert_builder.sign(private_key,
hashes.SHA256(),
default_backend())
return cert
def _check_or_add_cert(self, name, domain, key, authorization):
if 'certificate' in domain:
return domain['certificate']
self._log('domain:{}: generating CSR...', name)
builder = x509.CertificateSigningRequestBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name),
]))
csr = builder.sign(key, hashes.SHA256(), backend)
self._log('domain:{}: done', name)
self._log('domain:{}: requesting certificate...', name)
certificate = self._client.new_certificate(self._key, csr)
domain['certificate'] = certificate
self._write_config()
self._log('domain:{}: done: {}', name, certificate)
return certificate
def generate_wildcard_pem_bytes():
"""
Generate a wildcard (subject name '*') self-signed certificate valid for
10 years.
https://cryptography.io/en/latest/x509/tutorial/#creating-a-self-signed-certificate
:return: Bytes representation of the PEM certificate data
"""
key = generate_private_key(u'rsa')
name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u'*')])
cert = (
x509.CertificateBuilder()
.issuer_name(name)
.subject_name(name)
.not_valid_before(datetime.today() - timedelta(days=1))
.not_valid_after(datetime.now() + timedelta(days=3650))
.serial_number(int(uuid.uuid4()))
.public_key(key.public_key())
.sign(
private_key=key,
algorithm=hashes.SHA256(),
backend=default_backend())
)
return b''.join((
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()),
cert.public_bytes(serialization.Encoding.PEM)
))
def generate_csr(self, private_key, subject_name, extensions=None):
"""Generate certificate signing request.
Args:
private_key: Private key
subject_name (x509.Name): Subject name
extensions
Returns: x509.CertificateSigningRequest
"""
builder = x509.CertificateSigningRequestBuilder(
subject_name, [] if extensions is None else extensions)
return builder.sign(
private_key, self.sign_hash_algorithm, default_backend())
def test_ecies_generate_csr(self):
"""Test case for generate certificate signing request."""
ecies256 = ecies()
private_key = ecies256.generate_private_key()
csr = ecies256.generate_csr(private_key, x509.Name(
[x509.NameAttribute(NameOID.COMMON_NAME, u"test")]))
csr_pem = csr.public_bytes(Encoding.PEM)
self.assertTrue(csr_pem.startswith(
b"-----BEGIN CERTIFICATE REQUEST-----"))
def _decode_x509_name(backend, x509_name):
count = backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = backend._lib.X509_NAME_get_entry(x509_name, x)
attributes.append(_decode_x509_name_entry(backend, entry))
return x509.Name(attributes)
def _decode_x509_name(backend, x509_name):
count = backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = backend._lib.X509_NAME_get_entry(x509_name, x)
attributes.append(_decode_x509_name_entry(backend, entry))
return x509.Name(attributes)
def generate_self_signed_cert(cert_file, key_file):
# type: (Any, Any) -> None
"""Given two file-like objects, generate an SSL key and certificate
Args:
cert_file: The certificate file you wish to write to
key_file: The key file you wish to write to
"""
one_day = timedelta(1, 0, 0)
private_key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend())
public_key = private_key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(
x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'cryptography.io'),
]))
builder = builder.issuer_name(
x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'cryptography.io'),
]))
builder = builder.not_valid_before(datetime.today() - one_day)
builder = builder.not_valid_after(datetime.today() + timedelta(365 * 10))
builder = builder.serial_number(uuid4().int)
builder = builder.public_key(public_key)
builder = builder.add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True, )
certificate = builder.sign(
private_key=private_key,
algorithm=hashes.SHA256(),
backend=default_backend())
key_file.write(
private_key.private_bytes(
Encoding.PEM, PrivateFormat.TraditionalOpenSSL, NoEncryption()))
cert_file.write(certificate.public_bytes(Encoding.PEM))
def _decode_x509_name(backend, x509_name):
count = backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = backend._lib.X509_NAME_get_entry(x509_name, x)
attributes.append(_decode_x509_name_entry(backend, entry))
return x509.Name(attributes)
def _decode_x509_name(backend, x509_name):
count = backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = backend._lib.X509_NAME_get_entry(x509_name, x)
attributes.append(_decode_x509_name_entry(backend, entry))
return x509.Name(attributes)
def _decode_x509_name(backend, x509_name):
count = backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = backend._lib.X509_NAME_get_entry(x509_name, x)
attributes.append(_decode_x509_name_entry(backend, entry))
return x509.Name(attributes)
def _decode_x509_name(backend, x509_name):
count = backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = backend._lib.X509_NAME_get_entry(x509_name, x)
attributes.append(_decode_x509_name_entry(backend, entry))
return x509.Name(attributes)
def set_csr_if_blank(self):
if not self.csr:
private_key = self.get_key()
builder = x509.CertificateSigningRequestBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, self.get_common_name()),
x509.NameAttribute(NameOID.COUNTRY_NAME, u'{}'.format(self.account.country)),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'{}'.format(self.account.state)),
x509.NameAttribute(NameOID.LOCALITY_NAME, u'{}'.format(self.account.locality)),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'{}'.format(self.account.organization_name)),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'{}'.format(self.account.organizational_unit_name)),
]))
builder = builder.add_extension(x509.SubjectAlternativeName(self.get_san_entries()), critical=False)
csr = builder.sign(private_key, hashes.SHA256(), default_backend())
self.csr = csr.public_bytes(serialization.Encoding.PEM)
def ca_file(tmpdir):
"""
Create a valid PEM file with CA certificates and return the path.
"""
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
public_key = key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org"),
]))
builder = builder.issuer_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org"),
]))
one_day = datetime.timedelta(1, 0, 0)
builder = builder.not_valid_before(datetime.datetime.today() - one_day)
builder = builder.not_valid_after(datetime.datetime.today() + one_day)
builder = builder.serial_number(int(uuid.uuid4()))
builder = builder.public_key(public_key)
builder = builder.add_extension(
x509.BasicConstraints(ca=True, path_length=None), critical=True,
)
certificate = builder.sign(
private_key=key, algorithm=hashes.SHA256(),
backend=default_backend()
)
ca_file = tmpdir.join("test.pem")
ca_file.write_binary(
certificate.public_bytes(
encoding=serialization.Encoding.PEM,
)
)
return str(ca_file).encode("ascii")
def create_ca_certificate(cn, key_size=4096, certify_days=365):
key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend())
key_id = x509.SubjectKeyIdentifier.from_public_key(key.public_key())
subject = issuer = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)])
now = datetime.datetime.utcnow()
serial = x509.random_serial_number()
cert = x509.CertificateBuilder() \
.subject_name(subject) \
.issuer_name(issuer) \
.public_key(key.public_key()) \
.serial_number(serial) \
.not_valid_before(now) \
.not_valid_after(now + datetime.timedelta(days=certify_days)) \
.add_extension(key_id, critical=False) \
.add_extension(x509.AuthorityKeyIdentifier(key_id.digest,
[x509.DirectoryName(issuer)],
serial),
critical=False) \
.add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True) \
.add_extension(x509.KeyUsage(digital_signature=True,
content_commitment=False,
key_encipherment=False,
data_encipherment=False,
key_agreement=False,
key_cert_sign=True,
crl_sign=True,
encipher_only=False,
decipher_only=False),
critical=True) \
.sign(key, hashes.SHA256(), default_backend())
cert = cert.public_bytes(serialization.Encoding.PEM)
key = key.private_bytes(encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
return cert, key
def _decode_x509_name(backend, x509_name):
count = backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = backend._lib.X509_NAME_get_entry(x509_name, x)
attributes.append(_decode_x509_name_entry(backend, entry))
return x509.Name(attributes)
def dict_to_x509_name(data):
name_attributes = []
attr_name_oid = {
'commonName': x509.NameOID.COMMON_NAME,
'countryName': x509.NameOID.COUNTRY_NAME,
'stateOrProvinceName': x509.NameOID.STATE_OR_PROVINCE_NAME,
'locality': x509.NameOID.LOCALITY_NAME,
'organizationName': x509.NameOID.ORGANIZATION_NAME,
'organizationalUnitName': x509.NameOID.ORGANIZATIONAL_UNIT_NAME,
}
for key, value in data.items():
if not key in attr_name_oid:
raise ValueError('{} is not a supported x509 name attribute'.format(key))
name_attributes.append(x509.NameAttribute(attr_name_oid[key], value))
return x509.Name(name_attributes)
def _decode_x509_name(backend, x509_name):
count = backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = backend._lib.X509_NAME_get_entry(x509_name, x)
attributes.append(_decode_x509_name_entry(backend, entry))
return x509.Name(attributes)
def _decode_x509_name(backend, x509_name):
count = backend._lib.X509_NAME_entry_count(x509_name)
attributes = []
for x in range(count):
entry = backend._lib.X509_NAME_get_entry(x509_name, x)
attributes.append(_decode_x509_name_entry(backend, entry))
return x509.Name(attributes)