def test_basics():
ca = CA()
today = datetime.datetime.today()
assert b"BEGIN CERTIFICATE" in ca.cert_pem.bytes()
ca_cert = x509.load_pem_x509_certificate(
ca.cert_pem.bytes(), default_backend())
assert ca_cert.not_valid_before <= today <= ca_cert.not_valid_after
assert ca_cert.issuer == ca_cert.subject
bc = ca_cert.extensions.get_extension_for_class(x509.BasicConstraints)
assert bc.value.ca == True
assert bc.critical == True
with pytest.raises(ValueError):
ca.issue_server_cert()
server = ca.issue_server_cert(u"test-1.example.org", u"test-2.example.org")
assert b"PRIVATE KEY" in server.private_key_pem.bytes()
assert b"BEGIN CERTIFICATE" in server.cert_chain_pems[0].bytes()
assert len(server.cert_chain_pems) == 1
assert server.private_key_pem.bytes() in server.private_key_and_cert_chain_pem.bytes()
for blob in server.cert_chain_pems:
assert blob.bytes() in server.private_key_and_cert_chain_pem.bytes()
server_cert = x509.load_pem_x509_certificate(
server.cert_chain_pems[0].bytes(), default_backend())
assert server_cert.not_valid_before <= today <= server_cert.not_valid_after
assert server_cert.issuer == ca_cert.subject
san = server_cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
hostnames = san.value.get_values_for_type(x509.DNSName)
assert hostnames == [u"test-1.example.org", u"test-2.example.org"]
python类load_pem_x509_certificate()的实例源码
def __load_cert_from_file(filename):
with open(filename, "br") as x509_file:
crl = load_pem_x509_certificate(x509_file.read(), default_backend())
rsa = crl.public_key()
return rsa
def extract_rsa_key_from_x509_cert(pem):
cert = load_pem_x509_certificate(pem, backend)
return cert.public_key()
def from_primitive(cls, pem_string, session=None):
result = cls(session=session)
try:
return result.from_raw_key(x509.load_pem_x509_certificate(
utils.SmartStr(pem_string), backend=openssl.backend))
except (TypeError, ValueError, exceptions.UnsupportedAlgorithm) as e:
raise CipherError("X509 Certificate invalid: %s" % e)
return result
def _match_certificate(matcher):
return MatchesAny(
Not(IsInstance(Certificate)),
AfterPreprocessing(
lambda c: x509.load_pem_x509_certificate(
c.as_bytes(), default_backend()),
matcher))
def load(cls, data=None, filename=None):
if filename is not None:
with open(filename, 'rb') as fh:
data = fh.read()
return Certificate(
x509.load_pem_x509_certificate(data, default_backend())
)
def test_self_signed_cert(self):
priv = PrivateKey.create()
cert = priv.self_signed_cert({'common_name': u'jose'})
assert isinstance(cert, Certificate)
assert cert.subject.attribs['common_name'] == u'jose'
assert cert.issuer.attribs['common_name'] == u'jose'
cert = x509.load_pem_x509_certificate(
cert.dump(), default_backend()
)
assert cert.subject.get_attributes_for_oid(
NameOID.COMMON_NAME)[0].value == u'jose'
assert cert.issuer.get_attributes_for_oid(
NameOID.COMMON_NAME)[0].value == u'jose'
def loadPEMCert(pathToCert):
with open(pathToCert, 'rb') as f:
crt_data = f.read()
cert=x509.load_pem_x509_certificate(crt_data,default_backend())
return cert
def __string_to_cert(s, pkg_hash=None):
"""Convert a string to a X509 cert."""
try:
return x509.load_pem_x509_certificate(
misc.force_bytes(s), default_backend())
except ValueError:
if pkg_hash is not None:
raise api_errors.BadFileFormat(_("The file "
"with hash {0} was expected to be a PEM "
"certificate but it could not be "
"read.").format(pkg_hash))
raise api_errors.BadFileFormat(_("The following string "
"was expected to be a PEM certificate, but it "
"could not be parsed as such:\n{0}".format(s)))
def __get_certs_by_name(self, name):
"""Given 'name', a Cryptograhy 'Name' object, return the certs
with that name as a subject."""
res = []
count = 0
name_hsh = hashlib.sha1(misc.force_bytes(name)).hexdigest()
def load_cert(pth):
with open(pth, "rb") as f:
return x509.load_pem_x509_certificate(
f.read(), default_backend())
try:
while True:
pth = os.path.join(self.__subj_root,
"{0}.{1}".format(name_hsh, count))
res.append(load_cert(pth))
count += 1
except EnvironmentError as e:
# When switching to a different hash algorithm, the hash
# name of file changes so that we couldn't find the
# file. We try harder to rebuild the subject's metadata
# if it's the first time we fail (count == 0).
if count == 0 and e.errno == errno.ENOENT:
self.__rebuild_subj_root()
try:
res.append(load_cert(pth))
except EnvironmentError as ex:
if ex.errno != errno.ENOENT:
raise
t = api_errors._convert_error(e,
[errno.ENOENT])
if t:
raise t
res.extend(self.__issuers.get(name_hsh, []))
return res
def calc_pem_hash(pth):
"""Find the hash of pem representation the file."""
with open(pth, "rb") as f:
cert = x509.load_pem_x509_certificate(
f.read(), default_backend())
return hashlib.sha1(
cert.public_bytes(serialization.Encoding.PEM)).hexdigest()
def __make_tmp_cert(d, pth):
try:
with open(pth, "rb") as f:
cert = x509.load_pem_x509_certificate(f.read(),
default_backend())
except (ValueError, IOError) as e:
raise api_errors.BadFileFormat(_("The file {0} was expected to "
"be a PEM certificate but it could not be read.").format(
pth))
fd, fp = tempfile.mkstemp(dir=d)
with os.fdopen(fd, "wb") as fh:
fh.write(cert.public_bytes(serialization.Encoding.PEM))
return fp
def getCertFromBytes(pem_data):
return x509.load_pem_x509_certificate(pem_data, backend)
def generate_x509_fingerprint(pem_key):
try:
if isinstance(pem_key, six.text_type):
pem_key = pem_key.encode('utf-8')
cert = x509.load_pem_x509_certificate(
pem_key, backends.default_backend())
raw_fp = binascii.hexlify(cert.fingerprint(hashes.SHA1()))
if six.PY3:
raw_fp = raw_fp.decode('ascii')
return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
except (ValueError, TypeError, binascii.Error) as ex:
raise exception.InvalidKeypair(
reason=_('failed to generate X509 fingerprint. '
'Error message: %s') % ex)
def load_pem_certificate(data):
"""
Loads a PEM X.509 certificate.
"""
return x509.load_pem_x509_certificate(data, default_backend())
def _load_from_string(cls, certificate):
"""
Load a certificate from a string.
Args:
certificate (str): A base64 PEM encoded certificate
"""
certificate = certificate.encode()
try:
cert_obj = load_pem_x509_certificate(certificate, backend)
cls._public_keys.append(cert_obj.public_key())
cls._key_age = datetime.now()
except ValueError:
raise ImproperlyConfigured("Invalid ADFS token signing certificate")
def test_generate_wildcard_pem_bytes():
"""
When we generate a self-signed wildcard certificate's PEM data, that data
should be deserializable and the deserilized certificate should have the
expected parameters.
"""
pem_bytes = generate_wildcard_pem_bytes()
# Parse the concatenated bytes into a list of object
pem_objects = pem.parse(pem_bytes)
assert_that(pem_objects, HasLength(2))
# Deserialize the private key and assert that it is the right type (the
# other details we trust txacme with)
key = serialization.load_pem_private_key(
pem_objects[0].as_bytes(),
password=None,
backend=default_backend()
)
assert_that(key, IsInstance(rsa.RSAPrivateKey))
# Deserialize the certificate and validate all the options we set
cert = x509.load_pem_x509_certificate(
pem_objects[1].as_bytes(), backend=default_backend()
)
expected_before = datetime.today() - timedelta(days=1)
expected_after = datetime.now() + timedelta(days=3650)
assert_that(cert, MatchesStructure(
issuer=MatchesListwise([
MatchesStructure(value=Equals(u'*'))
]),
subject=MatchesListwise([
MatchesStructure(value=Equals(u'*'))
]),
not_valid_before=matches_time_or_just_before(expected_before),
not_valid_after=matches_time_or_just_before(expected_after),
signature_hash_algorithm=IsInstance(hashes.SHA256)
))
assert_that(cert.public_key().public_numbers(), Equals(
key.public_key().public_numbers()))
# From txacme
def verify_and_load_message(config, file, only_accept_cn=None):
'''Checks the message's signature to make sure it was signed by someone in our CA chain'''
try:
# We need a temporary file to get the signer PEM
msg_fd, signer_pem = tempfile.mkstemp()
os.close(msg_fd) # Don't need to write anything to it
# CApath is required to force out any system CAs that might be in the global CNF file
ossl_verify_cmd = ["openssl", "smime", "-verify",
"-in", file, "-CAfile", config.ssl_cafile,
"-CApath", "/dev/nonexistant-dir",
"-text", "-signer", signer_pem]
ossl_verify_proc = subprocess.run(
args=ossl_verify_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
check=False)
if ossl_verify_proc.returncode != 0:
config.logger.warn(
"rejecting %s: %s", file, str(ossl_verify_proc.stderr))
return None
config.logger.info("%s passed openssl S/MIME verify", file)
config.logger.debug("checking %s", signer_pem)
with open(signer_pem, 'rb') as x509_signer:
# NOW we can use cryptography to read the x509 certificates
cert = x509.load_pem_x509_certificate(
x509_signer.read(), default_backend())
common_name = cert.subject.get_attributes_for_oid(
x509.NameOID.COMMON_NAME)[0].value
config.logger.debug(
"signed message came from common name: %s", common_name)
# Make sure the CN is something we're willing to accept
if only_accept_cn is not None:
if common_name != only_accept_cn:
config.logger.error("rejecting message due to CN %s != %s",
common_name, only_accept_cn)
# Read it in
decoded_message = ossl_verify_proc.stdout
message = IngestMessage(config)
message.load_from_yaml(decoded_message)
return message
# and clean up after ourselves
finally:
os.remove(signer_pem)
def check_certificate(self, domain):
"""
Download and get information from the TLS certificate
"""
pem = ssl.get_server_certificate((domain, 443))
if self.output:
with open(os.path.join(self.output, 'cert.pem'), 'wb') as f:
f.write(pem)
cert = x509.load_pem_x509_certificate(str(pem), default_backend())
self.log.critical("\tCertificate:")
self.log.critical("\t\tDomain: %s", ",".join(map(lambda x: x.value, cert.subject)))
self.log.critical("\t\tNot After: %s", str(cert.not_valid_after))
self.log.critical("\t\tNot Before: %s", str(cert.not_valid_before))
self.log.critical("\t\tCA Issuer: %s", ", ".join(map(lambda x:x.value, cert.issuer)))
self.log.critical("\t\tSerial: %s", cert.serial_number)
for ext in cert.extensions:
if ext.oid._name == 'basicConstraints':
if ext.value.ca:
self.log.critical("\t\tBasic Constraints: True")
elif ext.oid._name == 'subjectAltName':
self.log.critical("\t\tAlternate names: %s", ", ".join(ext.value.get_values_for_type(x509.DNSName)))
def issue_certificate(cn, ca_cert, ca_key,
organizations=(),
san_dns=(),
san_ips=(),
key_size=2048,
certify_days=365,
is_web_server=False,
is_web_client=False):
ca_cert = x509.load_pem_x509_certificate(ca_cert, default_backend())
ca_key = serialization.load_pem_private_key(ca_key, password=None, backend=default_backend())
ca_key_id = x509.SubjectKeyIdentifier.from_public_key(ca_key.public_key())
key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend())
subject_name_attributes = [x509.NameAttribute(NameOID.COMMON_NAME, cn)]
subject_name_attributes += [x509.NameAttribute(NameOID.ORGANIZATION_NAME, org) for org in organizations]
subject = x509.Name(subject_name_attributes)
now = datetime.datetime.utcnow()
cert = x509.CertificateBuilder() \
.subject_name(subject) \
.issuer_name(ca_cert.issuer) \
.public_key(key.public_key()) \
.serial_number(x509.random_serial_number()) \
.not_valid_before(now) \
.not_valid_after(now + datetime.timedelta(days=certify_days)) \
.add_extension(x509.AuthorityKeyIdentifier(ca_key_id.digest,
[x509.DirectoryName(ca_cert.issuer)],
ca_cert.serial_number),
critical=False) \
.add_extension(x509.KeyUsage(digital_signature=True,
content_commitment=False,
key_encipherment=True,
data_encipherment=False,
key_agreement=False,
key_cert_sign=False,
crl_sign=False,
encipher_only=False,
decipher_only=False),
critical=True)
extended_usages = []
if is_web_server:
extended_usages.append(ExtendedKeyUsageOID.SERVER_AUTH)
if is_web_client:
extended_usages.append(ExtendedKeyUsageOID.CLIENT_AUTH)
if extended_usages:
cert = cert.add_extension(x509.ExtendedKeyUsage(extended_usages), critical=False)
sans = [x509.DNSName(name) for name in san_dns]
sans += [x509.IPAddress(ipaddress.ip_address(ip)) for ip in san_ips]
if sans:
cert = cert.add_extension(x509.SubjectAlternativeName(sans), critical=False)
cert = cert.sign(ca_key, hashes.SHA256(), default_backend())
cert = cert.public_bytes(serialization.Encoding.PEM)
key = key.private_bytes(encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
return cert, key