python类load_pem_x509_certificate()的实例源码

test_trustme.py 文件源码 项目:trustme 作者: python-trio 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_basics():
    ca = CA()

    today = datetime.datetime.today()

    assert b"BEGIN CERTIFICATE" in ca.cert_pem.bytes()

    ca_cert = x509.load_pem_x509_certificate(
        ca.cert_pem.bytes(), default_backend())
    assert ca_cert.not_valid_before <= today <= ca_cert.not_valid_after

    assert ca_cert.issuer == ca_cert.subject
    bc = ca_cert.extensions.get_extension_for_class(x509.BasicConstraints)
    assert bc.value.ca == True
    assert bc.critical == True

    with pytest.raises(ValueError):
        ca.issue_server_cert()

    server = ca.issue_server_cert(u"test-1.example.org", u"test-2.example.org")

    assert b"PRIVATE KEY" in server.private_key_pem.bytes()
    assert b"BEGIN CERTIFICATE" in server.cert_chain_pems[0].bytes()
    assert len(server.cert_chain_pems) == 1
    assert server.private_key_pem.bytes() in server.private_key_and_cert_chain_pem.bytes()
    for blob in server.cert_chain_pems:
        assert blob.bytes() in server.private_key_and_cert_chain_pem.bytes()

    server_cert = x509.load_pem_x509_certificate(
        server.cert_chain_pems[0].bytes(), default_backend())

    assert server_cert.not_valid_before <= today <= server_cert.not_valid_after
    assert server_cert.issuer == ca_cert.subject

    san = server_cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
    hostnames = san.value.get_values_for_type(x509.DNSName)
    assert hostnames == [u"test-1.example.org", u"test-2.example.org"]
openssl.py 文件源码 项目:sistemats 作者: timendum 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __load_cert_from_file(filename):
        with open(filename, "br") as x509_file:
            crl = load_pem_x509_certificate(x509_file.read(), default_backend())
            rsa = crl.public_key()
            return rsa
sigver.py 文件源码 项目:deb-python-pysaml2 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def extract_rsa_key_from_x509_cert(pem):
    cert = load_pem_x509_certificate(pem, backend)
    return cert.public_key()
crypto.py 文件源码 项目:rvmi-rekall 作者: fireeye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def from_primitive(cls, pem_string, session=None):
        result = cls(session=session)
        try:
            return result.from_raw_key(x509.load_pem_x509_certificate(
                utils.SmartStr(pem_string), backend=openssl.backend))
        except (TypeError, ValueError, exceptions.UnsupportedAlgorithm) as e:
            raise CipherError("X509 Certificate invalid: %s" % e)
        return result
test_service.py 文件源码 项目:txacme 作者: twisted 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _match_certificate(matcher):
    return MatchesAny(
        Not(IsInstance(Certificate)),
        AfterPreprocessing(
            lambda c: x509.load_pem_x509_certificate(
                c.as_bytes(), default_backend()),
            matcher))
pki.py 文件源码 项目:humancrypto 作者: iffy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def load(cls, data=None, filename=None):
        if filename is not None:
            with open(filename, 'rb') as fh:
                data = fh.read()
        return Certificate(
            x509.load_pem_x509_certificate(data, default_backend())
        )
test_rsa.py 文件源码 项目:humancrypto 作者: iffy 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_self_signed_cert(self):
        priv = PrivateKey.create()
        cert = priv.self_signed_cert({'common_name': u'jose'})
        assert isinstance(cert, Certificate)
        assert cert.subject.attribs['common_name'] == u'jose'
        assert cert.issuer.attribs['common_name'] == u'jose'

        cert = x509.load_pem_x509_certificate(
            cert.dump(), default_backend()
        )
        assert cert.subject.get_attributes_for_oid(
            NameOID.COMMON_NAME)[0].value == u'jose'
        assert cert.issuer.get_attributes_for_oid(
            NameOID.COMMON_NAME)[0].value == u'jose'
x509_functions.py 文件源码 项目:IoT_pki 作者: zibawa 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def loadPEMCert(pathToCert):
    with open(pathToCert, 'rb') as f:
        crt_data = f.read()
        cert=x509.load_pem_x509_certificate(crt_data,default_backend())
        return cert
publisher.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __string_to_cert(s, pkg_hash=None):
                """Convert a string to a X509 cert."""

                try:
                        return x509.load_pem_x509_certificate(
                            misc.force_bytes(s), default_backend())
                except ValueError:
                        if pkg_hash is not None:
                                raise api_errors.BadFileFormat(_("The file "
                                    "with hash {0} was expected to be a PEM "
                                    "certificate but it could not be "
                                    "read.").format(pkg_hash))
                        raise api_errors.BadFileFormat(_("The following string "
                            "was expected to be a PEM certificate, but it "
                            "could not be parsed as such:\n{0}".format(s)))
publisher.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __get_certs_by_name(self, name):
                """Given 'name', a Cryptograhy 'Name' object, return the certs
                with that name as a subject."""

                res = []
                count = 0
                name_hsh = hashlib.sha1(misc.force_bytes(name)).hexdigest()

                def load_cert(pth):
                        with open(pth, "rb") as f:
                                return x509.load_pem_x509_certificate(
                                    f.read(), default_backend())

                try:
                        while True:
                                pth = os.path.join(self.__subj_root,
                                    "{0}.{1}".format(name_hsh, count))
                                res.append(load_cert(pth))
                                count += 1
                except EnvironmentError as e:
                        # When switching to a different hash algorithm, the hash
                        # name of file changes so that we couldn't find the
                        # file. We try harder to rebuild the subject's metadata
                        # if it's the first time we fail (count == 0).
                        if count == 0 and e.errno == errno.ENOENT:
                                self.__rebuild_subj_root()
                                try:
                                        res.append(load_cert(pth))
                                except EnvironmentError as ex:
                                        if ex.errno != errno.ENOENT:
                                                raise

                        t = api_errors._convert_error(e,
                            [errno.ENOENT])
                        if t:
                                raise t
                res.extend(self.__issuers.get(name_hsh, []))
                return res
pkg5unittest.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def calc_pem_hash(pth):
                """Find the hash of pem representation the file."""
                with open(pth, "rb") as f:
                        cert = x509.load_pem_x509_certificate(
                            f.read(), default_backend())
                return hashlib.sha1(
                    cert.public_bytes(serialization.Encoding.PEM)).hexdigest()
sign.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __make_tmp_cert(d, pth):
        try:
                with open(pth, "rb") as f:
                        cert = x509.load_pem_x509_certificate(f.read(),
                            default_backend())
        except (ValueError, IOError) as e:
                raise api_errors.BadFileFormat(_("The file {0} was expected to "
                    "be a PEM certificate but it could not be read.").format(
                    pth))
        fd, fp = tempfile.mkstemp(dir=d)
        with os.fdopen(fd, "wb") as fh:
                fh.write(cert.public_bytes(serialization.Encoding.PEM))
        return fp
CipherUtil.py 文件源码 项目:Playground3 作者: CrimsonVista 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def getCertFromBytes(pem_data):
    return x509.load_pem_x509_certificate(pem_data, backend)
crypto.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def generate_x509_fingerprint(pem_key):
    try:
        if isinstance(pem_key, six.text_type):
            pem_key = pem_key.encode('utf-8')
        cert = x509.load_pem_x509_certificate(
            pem_key, backends.default_backend())
        raw_fp = binascii.hexlify(cert.fingerprint(hashes.SHA1()))
        if six.PY3:
            raw_fp = raw_fp.decode('ascii')
        return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
    except (ValueError, TypeError, binascii.Error) as ex:
        raise exception.InvalidKeypair(
            reason=_('failed to generate X509 fingerprint. '
                     'Error message: %s') % ex)
crypto.py 文件源码 项目:perkele 作者: schors 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def load_pem_certificate(data):
    """
    Loads a PEM X.509 certificate.
    """
    return x509.load_pem_x509_certificate(data, default_backend())
backend.py 文件源码 项目:django-auth-adfs 作者: jobec 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _load_from_string(cls, certificate):
        """
        Load a certificate from a string.

        Args:
            certificate (str): A base64 PEM encoded certificate
        """
        certificate = certificate.encode()
        try:
            cert_obj = load_pem_x509_certificate(certificate, backend)
            cls._public_keys.append(cert_obj.public_key())
            cls._key_age = datetime.now()
        except ValueError:
            raise ImproperlyConfigured("Invalid ADFS token signing certificate")
test_acme_util.py 文件源码 项目:marathon-acme 作者: praekeltfoundation 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_generate_wildcard_pem_bytes():
    """
    When we generate a self-signed wildcard certificate's PEM data, that data
    should be deserializable and the deserilized certificate should have the
    expected parameters.
    """
    pem_bytes = generate_wildcard_pem_bytes()

    # Parse the concatenated bytes into a list of object
    pem_objects = pem.parse(pem_bytes)

    assert_that(pem_objects, HasLength(2))

    # Deserialize the private key and assert that it is the right type (the
    # other details we trust txacme with)
    key = serialization.load_pem_private_key(
        pem_objects[0].as_bytes(),
        password=None,
        backend=default_backend()
    )
    assert_that(key, IsInstance(rsa.RSAPrivateKey))

    # Deserialize the certificate and validate all the options we set
    cert = x509.load_pem_x509_certificate(
        pem_objects[1].as_bytes(), backend=default_backend()
    )
    expected_before = datetime.today() - timedelta(days=1)
    expected_after = datetime.now() + timedelta(days=3650)
    assert_that(cert, MatchesStructure(
        issuer=MatchesListwise([
            MatchesStructure(value=Equals(u'*'))
        ]),
        subject=MatchesListwise([
            MatchesStructure(value=Equals(u'*'))
        ]),
        not_valid_before=matches_time_or_just_before(expected_before),
        not_valid_after=matches_time_or_just_before(expected_after),
        signature_hash_algorithm=IsInstance(hashes.SHA256)
    ))
    assert_that(cert.public_key().public_numbers(), Equals(
                key.public_key().public_numbers()))


# From txacme
ingest_message.py 文件源码 项目:ndr 作者: SecuredByTHEM 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def verify_and_load_message(config, file, only_accept_cn=None):
        '''Checks the message's signature to make sure it was signed by someone in our CA chain'''
        try:
            # We need a temporary file to get the signer PEM
            msg_fd, signer_pem = tempfile.mkstemp()
            os.close(msg_fd)  # Don't need to write anything to it

            # CApath is required to force out any system CAs that might be in the global CNF file
            ossl_verify_cmd = ["openssl", "smime", "-verify",
                               "-in", file, "-CAfile", config.ssl_cafile,
                               "-CApath", "/dev/nonexistant-dir",
                               "-text", "-signer", signer_pem]

            ossl_verify_proc = subprocess.run(
                args=ossl_verify_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                check=False)

            if ossl_verify_proc.returncode != 0:
                config.logger.warn(
                    "rejecting %s: %s", file, str(ossl_verify_proc.stderr))
                return None

            config.logger.info("%s passed openssl S/MIME verify", file)

            config.logger.debug("checking %s", signer_pem)
            with open(signer_pem, 'rb') as x509_signer:
                # NOW we can use cryptography to read the x509 certificates
                cert = x509.load_pem_x509_certificate(
                    x509_signer.read(), default_backend())

                common_name = cert.subject.get_attributes_for_oid(
                    x509.NameOID.COMMON_NAME)[0].value

                config.logger.debug(
                    "signed message came from common name: %s", common_name)

            # Make sure the CN is something we're willing to accept
            if only_accept_cn is not None:
                if common_name != only_accept_cn:
                    config.logger.error("rejecting message due to CN %s != %s",
                                        common_name, only_accept_cn)

            # Read it in
            decoded_message = ossl_verify_proc.stdout
            message = IngestMessage(config)
            message.load_from_yaml(decoded_message)

            return message

        # and clean up after ourselves
        finally:
            os.remove(signer_pem)
httpscan.py 文件源码 项目:analyst-scripts 作者: Te-k 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def check_certificate(self, domain):
        """
        Download and get information from the TLS certificate
        """
        pem = ssl.get_server_certificate((domain, 443))
        if self.output:
            with open(os.path.join(self.output, 'cert.pem'), 'wb') as f:
                f.write(pem)


        cert = x509.load_pem_x509_certificate(str(pem), default_backend())
        self.log.critical("\tCertificate:")
        self.log.critical("\t\tDomain: %s", ",".join(map(lambda x: x.value, cert.subject)))
        self.log.critical("\t\tNot After: %s", str(cert.not_valid_after))
        self.log.critical("\t\tNot Before: %s", str(cert.not_valid_before))
        self.log.critical("\t\tCA Issuer: %s", ", ".join(map(lambda x:x.value, cert.issuer)))
        self.log.critical("\t\tSerial: %s", cert.serial_number)
        for ext in cert.extensions:
            if ext.oid._name == 'basicConstraints':
                if ext.value.ca:
                    self.log.critical("\t\tBasic Constraints: True")
            elif ext.oid._name == 'subjectAltName':
                self.log.critical("\t\tAlternate names: %s", ", ".join(ext.value.get_values_for_type(x509.DNSName)))
pki.py 文件源码 项目:seedbox 作者: nailgun 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def issue_certificate(cn, ca_cert, ca_key,
                      organizations=(),
                      san_dns=(),
                      san_ips=(),
                      key_size=2048,
                      certify_days=365,
                      is_web_server=False,
                      is_web_client=False):
    ca_cert = x509.load_pem_x509_certificate(ca_cert, default_backend())
    ca_key = serialization.load_pem_private_key(ca_key, password=None, backend=default_backend())
    ca_key_id = x509.SubjectKeyIdentifier.from_public_key(ca_key.public_key())

    key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend())

    subject_name_attributes = [x509.NameAttribute(NameOID.COMMON_NAME, cn)]
    subject_name_attributes += [x509.NameAttribute(NameOID.ORGANIZATION_NAME, org) for org in organizations]
    subject = x509.Name(subject_name_attributes)

    now = datetime.datetime.utcnow()
    cert = x509.CertificateBuilder() \
        .subject_name(subject) \
        .issuer_name(ca_cert.issuer) \
        .public_key(key.public_key()) \
        .serial_number(x509.random_serial_number()) \
        .not_valid_before(now) \
        .not_valid_after(now + datetime.timedelta(days=certify_days)) \
        .add_extension(x509.AuthorityKeyIdentifier(ca_key_id.digest,
                                                   [x509.DirectoryName(ca_cert.issuer)],
                                                   ca_cert.serial_number),
                       critical=False) \
        .add_extension(x509.KeyUsage(digital_signature=True,
                                     content_commitment=False,
                                     key_encipherment=True,
                                     data_encipherment=False,
                                     key_agreement=False,
                                     key_cert_sign=False,
                                     crl_sign=False,
                                     encipher_only=False,
                                     decipher_only=False),
                       critical=True)

    extended_usages = []
    if is_web_server:
        extended_usages.append(ExtendedKeyUsageOID.SERVER_AUTH)
    if is_web_client:
        extended_usages.append(ExtendedKeyUsageOID.CLIENT_AUTH)
    if extended_usages:
        cert = cert.add_extension(x509.ExtendedKeyUsage(extended_usages), critical=False)

    sans = [x509.DNSName(name) for name in san_dns]
    sans += [x509.IPAddress(ipaddress.ip_address(ip)) for ip in san_ips]
    if sans:
        cert = cert.add_extension(x509.SubjectAlternativeName(sans), critical=False)

    cert = cert.sign(ca_key, hashes.SHA256(), default_backend())

    cert = cert.public_bytes(serialization.Encoding.PEM)
    key = key.private_bytes(encoding=serialization.Encoding.PEM,
                            format=serialization.PrivateFormat.TraditionalOpenSSL,
                            encryption_algorithm=serialization.NoEncryption())
    return cert, key


问题


面经


文章

微信
公众号

扫码关注公众号