python类load_pem_x509_certificate()的实例源码

crypto_util.py 文件源码 项目:TCP-IP 作者: JackZ0 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def verify_renewable_cert_sig(renewable_cert):
    """ Verifies the signature of a `.storage.RenewableCert` object.

    :param `.storage.RenewableCert` renewable_cert: cert to verify

    :raises errors.Error: If signature verification fails.
    """
    try:
        with open(renewable_cert.chain, 'rb') as chain:
            chain, _ = pyopenssl_load_certificate(chain.read())
        with open(renewable_cert.cert, 'rb') as cert:
            cert = x509.load_pem_x509_certificate(cert.read(), default_backend())
        hash_name = cert.signature_hash_algorithm.name
        OpenSSL.crypto.verify(chain, cert.signature, cert.tbs_certificate_bytes, hash_name)
    except (IOError, ValueError, OpenSSL.crypto.Error) as e:
        error_str = "verifying the signature of the cert located at {0} has failed. \
                Details: {1}".format(renewable_cert.cert, e)
        logger.exception(error_str)
        raise errors.Error(error_str)
test_certificate_utils.py 文件源码 项目:cursive 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def load_certificate(self, cert_name):
        # Load the raw certificate file data.
        path = os.path.join(self.cert_path, cert_name)
        with open(path, 'rb') as cert_file:
            data = cert_file.read()

        # Convert the raw certificate data into a certificate object, first
        # as a PEM-encoded certificate and, if that fails, then as a
        # DER-encoded certificate. If both fail, the certificate cannot be
        # loaded.
        try:
            return x509.load_pem_x509_certificate(data, default_backend())
        except Exception:
            try:
                return x509.load_der_x509_certificate(data, default_backend())
            except Exception:
                raise exception.SignatureVerificationError(
                    "Failed to load certificate: %s" % path
                )
config.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def validate_ca_cert(self, ignored):
        expected = self._get_expected_ca_cert_fingerprint()
        algo, expectedfp = expected.split(':')
        expectedfp = expectedfp.replace(' ', '')
        backend = default_backend()

        with open(self._get_ca_cert_path(), 'r') as f:
            certstr = f.read()
        cert = load_pem_x509_certificate(certstr, backend)
        hasher = getattr(hashes, algo)()
        fpbytes = cert.fingerprint(hasher)
        fp = binascii.hexlify(fpbytes)

        if fp != expectedfp:
            os.unlink(self._get_ca_cert_path())
            self.log.error("Fingerprint of CA cert doesn't match: %s <-> %s"
                           % (fp, expectedfp))
            raise NetworkError("The provider's CA fingerprint doesn't match")
test_certificate_authorization.py 文件源码 项目:loopchain 作者: theloopkr 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def read_file(self, subject_name, pw=None):
        """
        ???? ???

        :param subject_name: ??? ??? ??? ??
        :return: ??? ???
        """
        pem_path = os.path.join(os.path.dirname(__file__),
                                "../../resources/unittest/" + subject_name + "/cert.pem")
        f = open(pem_path, "rb")
        cert_pem = f.read()
        f.close()
        cert = x509.load_pem_x509_certificate(cert_pem, default_backend())

        key_path = os.path.join(os.path.dirname(__file__),
                                "../../resources/unittest/" + subject_name + "/key.pem")
        f = open(key_path, "rb")
        cert_key = f.read()
        f.close()

        private_key = serialization.load_pem_private_key(cert_key, pw, default_backend())
        return {'cert': cert, 'private_key': private_key}
certificate_authorization.py 文件源码 项目:loopchain 作者: theloopkr 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def load_pki(self, cert_path: str, cert_pass=None):
        """
        ??? ??

        :param cert_path: ??? ??
        :param cert_pass: ??? ????
        """
        ca_cert_file = join(cert_path, self.CERT_NAME)
        ca_pri_file = join(cert_path, self.PRI_NAME)

        # ???/??? ??
        with open(ca_cert_file, "rb") as der:
            cert_bytes = der.read()
            self.__ca_cert = x509.load_pem_x509_certificate(cert_bytes, default_backend())
        with open(ca_pri_file, "rb") as der:
            private_bytes = der.read()
            try:
                self.__ca_pri = serialization.load_pem_private_key(private_bytes, cert_pass, default_backend())
            except ValueError:
                logging.debug("Invalid Password")

        # ??? ? ? ??
        sign = self.sign_data(b'TEST')
        if self.verify_data(b'TEST', sign) is False:
            logging.debug("Invalid Signature(Root Certificate load test)")
pki.py 文件源码 项目:seedbox 作者: nailgun 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def validate_ca_certificate_constraints(cert_pem_data):
    cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
    try:
        constraints = cert.extensions.get_extension_for_oid(ExtensionOID.BASIC_CONSTRAINTS)
        constraints = constraints.value
    except x509.extensions.ExtensionNotFound:
        return

    if not constraints.ca:
        raise InvalidCertificate("Not a CA certificate")

    if constraints.path_length != 0:
        raise InvalidCertificate("Invalid pathlen")


# based on ssl.match_hostname code
# https://github.com/python/cpython/blob/6f0eb93183519024cb360162bdd81b9faec97ba6/Lib/ssl.py#L279
crypto.py 文件源码 项目:certproxy 作者: geneanet 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load_certificate(cert_file=None, cert_bytes=None):
    if cert_file:
        with open(cert_file, 'rb') as f:
            data = f.read()
    else:
        data = cert_bytes

    if '-----BEGIN'.encode() in data:
        cert = x509.load_pem_x509_certificate(
            data=data,
            backend=default_backend()
        )
    else:
        cert = x509.load_der_x509_certificate(
            data=data,
            backend=default_backend()
        )

    return cert
main.py 文件源码 项目:deb-python-dcos 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _user_cert_validation(cert_str):
    """Prompt user for validation of certification from cluster

    :param cert_str: cluster certificate bundle
    :type cert_str: str
    :returns whether or not user validated cert
    :rtype: bool
    """

    cert = x509.load_pem_x509_certificate(
        cert_str.encode('utf-8'), default_backend())
    fingerprint = cert.fingerprint(hashes.SHA256())
    pp_fingerprint = ":".join("{:02x}".format(c) for c in fingerprint).upper()

    msg = "SHA256 fingerprint of cluster certificate bundle:\n{}".format(
            pp_fingerprint)

    return confirm(msg, False)
azuread_tenant.py 文件源码 项目:social-core 作者: python-social-auth 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_certificate(self, kid):
        # retrieve keys from jwks_url
        resp = self.request(self.jwks_url(), method='GET')
        resp.raise_for_status()

        # find the proper key for the kid
        for key in resp.json()['keys']:
            if key['kid'] == kid:
                x5c = key['x5c'][0]
                break
        else:
            raise DecodeError('Cannot find kid={}'.format(kid))

        certificate = '-----BEGIN CERTIFICATE-----\n' \
                      '{}\n' \
                      '-----END CERTIFICATE-----'.format(x5c)

        return load_pem_x509_certificate(certificate.encode(),
                                         default_backend())
t_pkgsign.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_crl_0(self):
                """Test that the X509 CRL revocation works correctly."""

                with open(os.path.join(self.crl_dir, "ch1_ta4_crl.pem"),
                    "rb") as f:
                        crl = x509.load_pem_x509_crl(
                            f.read(), default_backend())

                with open(os.path.join(self.cs_dir,
                    "cs1_ch1_ta4_cert.pem"), "rb") as f:
                        cert = x509.load_pem_x509_certificate(
                            f.read(), default_backend())

                self.assertTrue(crl.issuer == cert.issuer)
                for rev in crl:
                        if rev.serial_number == cert.serial_number:
                                break
                else:
                        self.assertTrue(False, "Can not find revoked "
                            "certificate in CRL!")
laprassl.py 文件源码 项目:ansible-playbook 作者: fnordpipe 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def requireRenewal(self, path, lifetime):
        with open(path, 'rb') as fh:
            crt = fh.read()

        data = x509.load_pem_x509_certificate(crt, default_backend())
        date = data.not_valid_after - datetime.timedelta(days = lifetime)

        if datetime.datetime.now() <= date:
            return False
        else:
            return True
models.py 文件源码 项目:django-autocert 作者: farrepa 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def certificate_expiry_date(self):
        if self.certificate:
            cert = x509.load_pem_x509_certificate(str(self.certificate), default_backend())
            return cert.not_valid_after
models.py 文件源码 项目:django-autocert 作者: farrepa 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def certificate_common_name(self):
        if self.certificate:
            cert = x509.load_pem_x509_certificate(str(self.certificate), default_backend())
            return cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
crypto.py 文件源码 项目:manuale 作者: veeti 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load_pem_certificate(data):
    """
    Loads a PEM X.509 certificate.
    """
    return x509.load_pem_x509_certificate(data, default_backend())
test_crypto.py 文件源码 项目:2FAssassin 作者: maxwellkoh 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_convert_from_cryptography(self):
        crypto_cert = x509.load_pem_x509_certificate(
            intermediate_cert_pem, backend
        )
        cert = X509.from_cryptography(crypto_cert)

        assert isinstance(cert, X509)
        assert cert.get_version() == crypto_cert.version.value
token.py 文件源码 项目:auth-srv 作者: openpermissions 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def decode_token(token):
    """
    Get the organisation ID from a token

    :param token: a JSON Web Token
    :returns: str, organisation ID
    :raises:
        jwt.DecodeError: Invalid token or not signed with our key
        jwt.ExpiredSignatureError: Token has expired
        jwt.InvalidAudienceError: Invalid "aud" claim
        jwt.InvalidIssuerError: Invalid "iss" claim
        jwt.MissingRequiredClaimError: Missing a required claim
    """
    cert_file = getattr(options, 'ssl_cert', None) or LOCALHOST_CRT
    with open(cert_file) as f:
        cert = load_pem_x509_certificate(f.read(), default_backend())
        public_key = cert.public_key()

    payload = jwt.decode(token,
                         public_key,
                         audience=audience(),
                         issuer=issuer(),
                         algorithms=[ALGORITHM],
                         verify=True)

    if not payload.get('sub'):
        raise jwt.MissingRequiredClaimError('"sub" claim is required')

    payload['scope'] = Scope(payload['scope'])

    return payload
delegate_proxy.py 文件源码 项目:globus-cli 作者: globus 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def parse_issuer_cred(issuer_cred):
    """
    Given an X509 PEM file in the form of a string, parses it into sections
    by the PEM delimiters of: -----BEGIN <label>----- and -----END <label>----
    Confirms the sections can be decoded in the proxy credential order of:
    issuer cert, issuer private key, proxy chain of 0 or more certs .
    Returns the issuer cert and private key as loaded cryptography objects
    and the proxy chain as a potentially empty string.
    """
    # get each section of the PEM file
    sections = re.findall(
        "-----BEGIN.*?-----.*?-----END.*?-----", issuer_cred, flags=re.DOTALL)
    try:
        issuer_cert = sections[0]
        issuer_private_key = sections[1]
        issuer_chain_certs = sections[2:]
    except IndexError:
        raise ValueError("Unable to parse PEM data in credentials, "
                         "make sure the X.509 file is in PEM format and "
                         "consists of the issuer cert, issuer private key, "
                         "and proxy chain (if any) in that order.")

    # then validate that each section of data can be decoded as expected
    try:
        loaded_cert = x509.load_pem_x509_certificate(
            six.b(issuer_cert), default_backend())
        loaded_private_key = serialization.load_pem_private_key(
            six.b(issuer_private_key),
            password=None, backend=default_backend())
        for chain_cert in issuer_chain_certs:
            x509.load_pem_x509_certificate(
                six.b(chain_cert), default_backend())
        issuer_chain = "".join(issuer_chain_certs)
    except ValueError:
        raise ValueError("Failed to decode PEM data in credentials. Make sure "
                         "the X.509 file consists of the issuer cert, "
                         "issuer private key, and proxy chain (if any) "
                         "in that order.")

    # return loaded cryptography objects and the issuer chain
    return loaded_cert, loaded_private_key, issuer_chain
test_ca_service.py 文件源码 项目:loopchain 作者: theloopkr 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __load_cert(self, cert_dir):
        """???/??? ?? ? ?? ???

        :param cert_dir: ???/??? ?? ??
        :return: X509 ???
        """
        logging.debug("Cert/Key loading...")
        cert_file = join(cert_dir, "cert.pem")
        pri_file = join(cert_dir, "key.pem")

        f = open(cert_file, "rb")
        cert_bytes = f.read()
        f.close()
        cert = x509.load_pem_x509_certificate(cert_bytes, default_backend())

        f = open(pri_file, "rb")
        pri_bytes = f.read()
        f.close()
        try:
            pri = serialization.load_pem_private_key(pri_bytes, self.__PASSWD, default_backend())
        except ValueError:
            logging.debug("Invalid Password(%s)", cert_dir)
            return None

        data = b"test"
        signature = pri.sign(data, ec.ECDSA(hashes.SHA256()))

        try:
            pub_key = cert.public_key()
            result = pub_key.verify(signature, data, ec.ECDSA(hashes.SHA256()))
        except InvalidSignature:
            logging.debug("sign test fail")
            result = False

        if result:
            return cert
        else:
            logging.error("result is False ")
            return None
ca_service.py 文件源码 项目:loopchain 作者: theloopkr 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def convert_x509cert_from_pem(self, cert_pem):
        """PEM ???? x509 ???? ??

        :param cert_pem: PEM ???
        :return: x509 ???
        """
        return x509.load_pem_x509_certificate(cert_pem, default_backend())
pki.py 文件源码 项目:seedbox 作者: nailgun 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def validate_certificate_common_name(cert_pem_data, subject_name):
    cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
    _match_subject_name(cert, subject_name, alt_names=False)
pki.py 文件源码 项目:seedbox 作者: nailgun 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def validate_certificate_hosts(cert_pem_data, host_names):
    cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
    for host_name in host_names:
        _match_subject_name(cert, host_name, compare_func=ssl._dnsname_match)
pki.py 文件源码 项目:seedbox 作者: nailgun 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def validate_certificate_host_ips(cert_pem_data, host_ips):
    cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
    for host_ip in host_ips:
        _match_subject_ip(cert, host_ip)
pki.py 文件源码 项目:seedbox 作者: nailgun 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def validate_certificate_key_usage(cert_pem_data, is_web_server, is_web_client):
    cert = x509.load_pem_x509_certificate(cert_pem_data, default_backend())
    try:
        key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.KEY_USAGE)
        key_usage = key_usage.value
    except x509.extensions.ExtensionNotFound:
        raise InvalidCertificate("Key usage not specified")

    if not key_usage.digital_signature:
        raise InvalidCertificate("Not intented for Digital Signature")

    if not key_usage.key_encipherment:
        raise InvalidCertificate("Not intented for Key Encipherment")

    if is_web_server or is_web_client:
        try:
            exteneded_key_usage = cert.extensions.get_extension_for_oid(ExtensionOID.EXTENDED_KEY_USAGE)
            exteneded_key_usage = exteneded_key_usage.value
        except x509.extensions.ExtensionNotFound:
            raise InvalidCertificate("Extended key usage not specified")

        if is_web_server:
            if ExtendedKeyUsageOID.SERVER_AUTH not in exteneded_key_usage:
                raise InvalidCertificate("Not intented for TLS Web Server Authentication")

        if is_web_client:
            if ExtendedKeyUsageOID.CLIENT_AUTH not in exteneded_key_usage:
                raise InvalidCertificate("Not intented for TLS Web Client Authentication")
ca.py 文件源码 项目:certproxy 作者: geneanet 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def list_hosts(self):
        hosts = {}

        for csr_file in os.listdir(self.csr_path):
            with open(os.path.join(self.csr_path, csr_file), 'rb') as f:
                csr = x509.load_pem_x509_csr(f.read(), default_backend())
                hosts[csr.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value] = {
                    'key_fingerprint': rsa_key_fingerprint(csr.public_key()),
                    'cert_fingerprint': None,
                    'status': 'pending',
                }

        for crt_file in os.listdir(self.crt_path):
            with open(os.path.join(self.crt_path, crt_file), 'rb') as f:
                crt = x509.load_pem_x509_certificate(f.read(), default_backend())
                revoked = revoked_cert(crt, self.crl)
                if revoked:
                    status = 'revoked'
                else:
                    status = 'authorized'
                hosts[crt.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value] = {
                    'key_fingerprint': rsa_key_fingerprint(crt.public_key()),
                    'cert_fingerprint': x509_cert_fingerprint(crt),
                    'status': status,
                }

        return hosts
utils.py 文件源码 项目:RKSV 作者: ztp-at 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def loadCert(pem):
    """
    Creates a cryptography certificate object from the given PEM certificate.
    :param pem: A certificate as a PEM string.
    :return: A cryptography certificate object.
    """
    return x509.load_pem_x509_certificate(pem.encode("utf-8"), default_backend())
jwt_agent.py 文件源码 项目:networking-vpp 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, local_cert, priv_key, ca_cert, controller_name_re):
        """Initialize JWTUtils

        Load the local node certificate, the node private
        key and the CA certificate from files; prepare for both
        signing and validation of key-value pairs.

        Signing will take place with the local certificate, and the
        public half will be added to signed objects.

        Validation will take place with the CA certificate, along with
        other checks that the signing matches the payload.

        :param local_cert: file containing public half of the local key
        :param priv_key: file containing private half of the local key
        :param ca_cert: file containing CA root certificate
        raise: IOError if the files cannot be read.
        """

        priv_key_pem = self._get_crypto_material(priv_key)
        self.private_key = serialization.load_pem_private_key(
            priv_key_pem,
            password=None,
            backend=default_backend())

        self.node_certificate = self._get_crypto_material(local_cert)
        self.node_cert_obj = load_pem_x509_certificate(
            self.node_certificate,
            default_backend())
        self.node_cert_pem = self.node_cert_obj.public_bytes(
            serialization.Encoding.PEM)

        ca_certificate = self._get_crypto_material(ca_cert)

        # pyopenssl
        root_ca = crypto.load_certificate(crypto.FILETYPE_PEM,
                                          ca_certificate)
        self.store = crypto.X509Store()
        self.store.add_cert(root_ca)

        self.controller_name_re = controller_name_re
utils.py 文件源码 项目:mogan 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def generate_x509_fingerprint(pem_key):
    try:
        if isinstance(pem_key, six.text_type):
            pem_key = pem_key.encode('utf-8')
        cert = x509.load_pem_x509_certificate(
            pem_key, backends.default_backend())
        raw_fp = binascii.hexlify(cert.fingerprint(hashes.SHA1()))
        if six.PY3:
            raw_fp = raw_fp.decode('ascii')
        return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
    except (ValueError, TypeError, binascii.Error) as ex:
        raise exception.InvalidKeypair(
            reason=_('failed to generate X509 fingerprint. '
                     'Error message: %s') % ex)
generate_wrapped_rsa_key.py 文件源码 项目:appbackendapi 作者: codesdk 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_google_public_cert_key():
    r = requests.get(GOOGLE_PUBLIC_CERT_URL)
    r.raise_for_status()

    # Load the certificate.
    certificate = x509.load_pem_x509_certificate(
        r.text.encode('utf-8'), default_backend())

    # Get the certicate's public key.
    public_key = certificate.public_key()

    return public_key
__init__.py 文件源码 项目:lokey 作者: jpf 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def deserialize(self, data):
        cert = x509.load_pem_x509_certificate(data, default_backend())
        key_material = cert.public_key().public_numbers()
        self._e = key_material.e
        self._n = key_material.n
response.py 文件源码 项目:eetcz 作者: versatilecz 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _verify(self):
        '''
        Check, if message is wellsigned
        '''
        try:
            # canonize soap body a make sha256 digest
            body_c14n = etree.tostring(self.body, method='c14n', exclusive=True, with_comments=False)
            sha256 = hashlib.sha256(body_c14n)
            digest = b64encode(sha256.digest())

            # load cert options
            cert = self.root.find('.//wsse:BinarySecurityToken', namespaces=NSMAP)
            sig_info = self.root.find('.//ds:SignedInfo', namespaces=NSMAP)
            sig_value = self.root.find('.//ds:SignatureValue', namespaces=NSMAP)

            # check, if there is all nesesery data
            assert cert is not None
            assert sig_info is not None
            assert sig_value is not None

            # canonize signature info
            sig_info_c14n = etree.tostring(sig_info, method='c14n', exclusive=True, with_comments=False)

            # transform and load cert
            cert = '\n'.join(['-----BEGIN CERTIFICATE-----'] + textwrap.wrap(cert.text, 64) + ['-----END CERTIFICATE-----\n'])
            cert = load_pem_x509_certificate(cert.encode('utf-8'), default_backend())
            key = cert.public_key()

            # verify digest
            verifier = key.verifier(b64decode(sig_value.text), padding.PKCS1v15(), hashes.SHA256())
            verifier.update(sig_info_c14n)
            # if verify fail, raise exception
            verifier.verify()

            return True

        except Exception as e:
            logger.exception(e)

        # probably error, return false
        return False


问题


面经


文章

微信
公众号

扫码关注公众号