python类CERT_OPTIONAL的实例源码

connection.py 文件源码 项目:gimel 作者: Alephbet 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
                 ssl_ca_certs=None, **kwargs):
        if not ssl_available:
            raise RedisError("Python wasn't built with SSL support")

        super(SSLConnection, self).__init__(**kwargs)

        self.keyfile = ssl_keyfile
        self.certfile = ssl_certfile
        if ssl_cert_reqs is None:
            ssl_cert_reqs = ssl.CERT_NONE
        elif isinstance(ssl_cert_reqs, basestring):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if ssl_cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    ssl_cert_reqs)
            ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
        self.cert_reqs = ssl_cert_reqs
        self.ca_certs = ssl_ca_certs
connection.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
                 ssl_ca_certs=None, **kwargs):
        if not ssl_available:
            raise RedisError("Python wasn't built with SSL support")

        super(SSLConnection, self).__init__(**kwargs)

        self.keyfile = ssl_keyfile
        self.certfile = ssl_certfile
        if ssl_cert_reqs is None:
            ssl_cert_reqs = ssl.CERT_NONE
        elif isinstance(ssl_cert_reqs, basestring):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if ssl_cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    ssl_cert_reqs)
            ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
        self.cert_reqs = ssl_cert_reqs
        self.ca_certs = ssl_ca_certs
connection.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
                 ssl_ca_certs=None, **kwargs):
        if not ssl_available:
            raise RedisError("Python wasn't built with SSL support")

        super(SSLConnection, self).__init__(**kwargs)

        self.keyfile = ssl_keyfile
        self.certfile = ssl_certfile
        if ssl_cert_reqs is None:
            ssl_cert_reqs = ssl.CERT_NONE
        elif isinstance(ssl_cert_reqs, basestring):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if ssl_cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    ssl_cert_reqs)
            ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
        self.cert_reqs = ssl_cert_reqs
        self.ca_certs = ssl_ca_certs
tornado2.py 文件源码 项目:http2 作者: mSOHU 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _verify_cert(self, peer_cert):
        """Returns True if peercert is valid according to the configured
        validation mode and hostname.

        The ssl handshake already tested the certificate for a valid
        CA signature; the only thing that remains is to check
        the hostname.
        """
        verify_mode = self.ssl_options.verify_mode
        assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
        if verify_mode == ssl.CERT_NONE or self.host is None:
            return True

        if peer_cert is None and verify_mode == ssl.CERT_REQUIRED:
            logger.warning("No SSL certificate given")
            return False
        try:
            ssl_match_hostname(peer_cert, self.host)
        except SSLCertificateError:
            logger.warning("Invalid SSL certificate", exc_info=True)
            return False
        else:
            return True
rocket.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
rocket.py 文件源码 项目:true_review_web2py 作者: lucadealfaro 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
server.py 文件源码 项目:knowledge-management 作者: dgore7 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def create_secure_socket(self, cert_dir, host='localhost', port=8001):
        context = ssl.SSLContext() # Defaults to SSL/TLS support with PROTOCOL_TLS (best for now for compatibility)
        context.verify_mode = ssl.CERT_OPTIONAL # ssl.CERT_REQUIRED is more secure
        context.check_hostname = False # Hostname verification on certs (Dont want for now)
        context.load_default_certs(purpose=ssl.Purpose.CLIENT_AUTH) # Load the public CA certs for the server socket (need CLIENT_AUTH param)
        self.generate_server_self_cert(cert_dir)
        context.load_cert_chain(join(cert_dir, "KnowledgeManagement.crt"), keyfile=join(cert_dir, "KnowledgeManagement.key"))

        # Create the secure socket that will be listening for connections.
        # Note that the SSL handshake is NOT performed upon connection so the server can securely transfer the cert
        # if required.
        self.server_proc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_proc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        secureSocket = context.wrap_socket(self.server_proc, server_side=True)
        secureSocket.bind((host, port))
        print("SSL Server started on {}:{}".format(host, port))

        #self.server_negotiate_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        #self.server_negotiate_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        #self.server_negotiate_sock.bind((host, port+1))
        self.is_listening=True
        return secureSocket
rocket.py 文件源码 项目:Problematica-public 作者: TechMaz 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
connection.py 文件源码 项目:Flask-NvRay-Blog 作者: rui7157 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
                 ssl_ca_certs=None, **kwargs):
        if not ssl_available:
            raise RedisError("Python wasn't built with SSL support")

        super(SSLConnection, self).__init__(**kwargs)

        self.keyfile = ssl_keyfile
        self.certfile = ssl_certfile
        if ssl_cert_reqs is None:
            ssl_cert_reqs = ssl.CERT_NONE
        elif isinstance(ssl_cert_reqs, basestring):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if ssl_cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    ssl_cert_reqs)
            ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
        self.cert_reqs = ssl_cert_reqs
        self.ca_certs = ssl_ca_certs
connection.py 文件源码 项目:Flask-NvRay-Blog 作者: rui7157 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
                 ssl_ca_certs=None, **kwargs):
        if not ssl_available:
            raise RedisError("Python wasn't built with SSL support")

        super(SSLConnection, self).__init__(**kwargs)

        self.keyfile = ssl_keyfile
        self.certfile = ssl_certfile
        if ssl_cert_reqs is None:
            ssl_cert_reqs = ssl.CERT_NONE
        elif isinstance(ssl_cert_reqs, basestring):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if ssl_cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    ssl_cert_reqs)
            ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
        self.cert_reqs = ssl_cert_reqs
        self.ca_certs = ssl_ca_certs
rocket.py 文件源码 项目:rekall-agent-server 作者: rekall-innovations 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
server.py 文件源码 项目:certproxy 作者: geneanet 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self, handler):
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        context.load_cert_chain(handler.certificate_file, handler.private_key_file)
        context.load_verify_locations(cafile=handler.certificate_file)
        context.load_verify_locations(cafile=handler.crl_file)
        context.options &= ssl.OP_NO_SSLv3
        context.options &= ssl.OP_NO_SSLv2
        context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
        context.verify_mode = ssl.CERT_OPTIONAL
        self.options['ssl_context'] = context

        logger.info('Starting server on host %s port %d.', self.host, self.port)

        server = pywsgi.WSGIServer(
            (self.host, self.port),
            handler,
            ssl_context=context,
            handler_class=RequestHandler,
            log=logger,
            error_log=logger,
        )
        server.serve_forever()
mqtt.py 文件源码 项目:thingflow-python 作者: mpi-sws-rse 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _connect(self):
        if self.server_tls:
            raise Exception("TBD")
            print(self.client.tls_set(self.server_tls.server_cert, cert_reqs=ssl.CERT_OPTIONAL))
            print(self.client.connect(self.host, self.port))
        else:
            self.client.connect(self.host, self.port) 
            self.client.subscribe(self.topics)

        def on_connect(client, userdata, flags, rc):
            print("Connected with result code "+str(rc))
        self.client.on_connect = on_connect

        def on_publish(client, userdata, mid):
            print("Successfully published mid %d" % mid)
        self.client.on_publish = on_publish
rocket.py 文件源码 项目:web3py 作者: web2py 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
rocket.py 文件源码 项目:CSC376KnowledgeManagement 作者: WCotterman 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
rocket.py 文件源码 项目:slugiot-client 作者: slugiot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
connection.py 文件源码 项目:PopRank 作者: SanketMore 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
                 ssl_ca_certs=None, **kwargs):
        if not ssl_available:
            raise RedisError("Python wasn't built with SSL support")

        super(SSLConnection, self).__init__(**kwargs)

        self.keyfile = ssl_keyfile
        self.certfile = ssl_certfile
        if ssl_cert_reqs is None:
            ssl_cert_reqs = ssl.CERT_NONE
        elif isinstance(ssl_cert_reqs, basestring):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if ssl_cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    ssl_cert_reqs)
            ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
        self.cert_reqs = ssl_cert_reqs
        self.ca_certs = ssl_ca_certs
connection.py 文件源码 项目:aredis 作者: NoneGG 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, keyfile=None, certfile=None,
                 cert_reqs=None, ca_certs=None):
        self.keyfile = keyfile
        self.certfile = certfile
        if cert_reqs is None:
            self.cert_reqs = ssl.CERT_NONE
        elif isinstance(cert_reqs, str):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    cert_reqs)
            self.cert_reqs = CERT_REQS[cert_reqs]
        self.ca_certs = ca_certs
        self.context = None
test_connection_pool.py 文件源码 项目:aredis 作者: NoneGG 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_cert_reqs_options(self):
        import ssl
        with pytest.raises(TypeError) as e:
            pool = aredis.ConnectionPool.from_url(
                'rediss://?ssl_cert_reqs=none&ssl_keyfile=test')
            assert e.message == 'certfile should be a valid filesystem path'
            assert pool.get_connection().ssl_context.verify_mode == ssl.CERT_NONE

        with pytest.raises(TypeError) as e:
            pool = aredis.ConnectionPool.from_url(
                'rediss://?ssl_cert_reqs=optional&ssl_keyfile=test')
            assert e.message == 'certfile should be a valid filesystem path'
            assert pool.get_connection().ssl_context.verify_mode == ssl.CERT_OPTIONAL

        with pytest.raises(TypeError) as e:
            pool = aredis.ConnectionPool.from_url(
                'rediss://?ssl_cert_reqs=required&ssl_keyfile=test')
            assert e.message == 'certfile should be a valid filesystem path'
            assert pool.get_connection().ssl_context.verify_mode == ssl.CERT_REQUIRED
connection.py 文件源码 项目:lambda-examples 作者: GalacticFog 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
                 ssl_ca_certs=None, **kwargs):
        if not ssl_available:
            raise RedisError("Python wasn't built with SSL support")

        super(SSLConnection, self).__init__(**kwargs)

        self.keyfile = ssl_keyfile
        self.certfile = ssl_certfile
        if ssl_cert_reqs is None:
            ssl_cert_reqs = ssl.CERT_NONE
        elif isinstance(ssl_cert_reqs, basestring):
            CERT_REQS = {
                'none': ssl.CERT_NONE,
                'optional': ssl.CERT_OPTIONAL,
                'required': ssl.CERT_REQUIRED
            }
            if ssl_cert_reqs not in CERT_REQS:
                raise RedisError(
                    "Invalid SSL Certificate Requirements Flag: %s" %
                    ssl_cert_reqs)
            ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
        self.cert_reqs = ssl_cert_reqs
        self.ca_certs = ssl_ca_certs
rocket.py 文件源码 项目:StuffShare 作者: StuffShare 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def wrap_socket(self, sock):
        try:
            if self.clientcert_req:
                ca_certs = self.interface[4]
                cert_reqs = ssl.CERT_OPTIONAL
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       cert_reqs=cert_reqs,
                                       ca_certs=ca_certs,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
            else:
                sock = ssl.wrap_socket(sock,
                                       keyfile=self.interface[2],
                                       certfile=self.interface[3],
                                       server_side=True,
                                       ssl_version=ssl.PROTOCOL_SSLv23)
        except SSLError:
            # Generally this happens when an HTTP request is received on a
            # secure socket. We don't do anything because it will be detected
            # by Worker and dealt with appropriately.
            pass

        return sock
ssl_support.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def validate_cert_reqs(option, value):
        """Validate the cert reqs are valid. It must be None or one of the
        three values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or
        ``ssl.CERT_REQUIRED``.
        """
        if value is None:
            return value
        elif isinstance(value, string_type) and hasattr(ssl, value):
            value = getattr(ssl, value)

        if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
            return value
        raise ValueError("The value of %s must be one of: "
                         "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
                         "`ssl.CERT_REQUIRED" % (option,))
ssl_context.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __get_verify_mode(self):
        """Whether to try to verify other peers' certificates and how to
        behave if verification fails. This attribute must be one of
        ssl.CERT_NONE, ssl.CERT_OPTIONAL or ssl.CERT_REQUIRED.
        """
        return self._verify_mode
common.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def validate_cert_reqs(option, value):
    """Validate the cert reqs are valid. It must be None or one of the three
    values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``"""
    if value is None:
        return value
    if HAS_SSL:
        if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
            return value
        raise ConfigurationError("The value of %s must be one of: "
                                 "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
                                 "`ssl.CERT_REQUIRED" % (option,))
    else:
        raise ConfigurationError("The value of %s is set but can't be "
                                 "validated. The ssl module is not available"
                                 % (option,))
iostream.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _verify_cert(self, peercert):
        """Returns True if peercert is valid according to the configured
        validation mode and hostname.

        The ssl handshake already tested the certificate for a valid
        CA signature; the only thing that remains is to check
        the hostname.
        """
        if isinstance(self._ssl_options, dict):
            verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
        elif isinstance(self._ssl_options, ssl.SSLContext):
            verify_mode = self._ssl_options.verify_mode
        assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
        if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
            return True
        cert = self.socket.getpeercert()
        if cert is None and verify_mode == ssl.CERT_REQUIRED:
            gen_log.warning("No SSL certificate given")
            return False
        try:
            ssl_match_hostname(peercert, self._server_hostname)
        except SSLCertificateError as e:
            gen_log.warning("Invalid SSL certificate: %s" % e)
            return False
        else:
            return True
common.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def validate_cert_reqs(option, value):
    """Validate the cert reqs are valid. It must be None or one of the three
    values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``"""
    if value is None:
        return value
    if HAS_SSL:
        if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
            return value
        raise ConfigurationError("The value of %s must be one of: "
                                 "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
                                 "`ssl.CERT_REQUIRED" % (option,))
    else:
        raise ConfigurationError("The value of %s is set but can't be "
                                 "validated. The ssl module is not available"
                                 % (option,))
common.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def validate_cert_reqs(option, value):
    """Validate the cert reqs are valid. It must be None or one of the three
    values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``"""
    if value is None:
        return value
    if HAS_SSL:
        if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
            return value
        raise ConfigurationError("The value of %s must be one of: "
                                 "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
                                 "`ssl.CERT_REQUIRED" % (option,))
    else:
        raise ConfigurationError("The value of %s is set but can't be "
                                 "validated. The ssl module is not available"
                                 % (option,))
iostream.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _verify_cert(self, peercert):
        """Returns True if peercert is valid according to the configured
        validation mode and hostname.

        The ssl handshake already tested the certificate for a valid
        CA signature; the only thing that remains is to check
        the hostname.
        """
        if isinstance(self._ssl_options, dict):
            verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
        elif isinstance(self._ssl_options, ssl.SSLContext):
            verify_mode = self._ssl_options.verify_mode
        assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
        if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
            return True
        cert = self.socket.getpeercert()
        if cert is None and verify_mode == ssl.CERT_REQUIRED:
            gen_log.warning("No SSL certificate given")
            return False
        try:
            ssl_match_hostname(peercert, self._server_hostname)
        except SSLCertificateError as e:
            gen_log.warning("Invalid SSL certificate: %s" % e)
            return False
        else:
            return True
common.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def validate_cert_reqs(option, value):
    """Validate the cert reqs are valid. It must be None or one of the three
    values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``"""
    if value is None:
        return value
    if HAS_SSL:
        if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
            return value
        raise ConfigurationError("The value of %s must be one of: "
                                 "`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
                                 "`ssl.CERT_REQUIRED" % (option,))
    else:
        raise ConfigurationError("The value of %s is set but can't be "
                                 "validated. The ssl module is not available"
                                 % (option,))
connectionpool.py 文件源码 项目:Bethesda_SublimeFO4 作者: Scrivener07 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def set_cert(self, key_file=None, cert_file=None,
                 cert_reqs='CERT_NONE', ca_certs=None):
        ssl_req_scheme = {
            'CERT_NONE': ssl.CERT_NONE,
            'CERT_OPTIONAL': ssl.CERT_OPTIONAL,
            'CERT_REQUIRED': ssl.CERT_REQUIRED
        }

        self.key_file = key_file
        self.cert_file = cert_file
        self.cert_reqs = ssl_req_scheme.get(cert_reqs) or ssl.CERT_NONE
        self.ca_certs = ca_certs


问题


面经


文章

微信
公众号

扫码关注公众号