python类HAS_SNI的实例源码

client.py 文件源码 项目:hakkuframework 作者: 4shadoww 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
client.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket.create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
test_urllib2net.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_sni(self):
        self.skipTest("test disabled - test server needed")
        # Checks that Server Name Indication works, if supported by the
        # OpenSSL linked to.
        # The ssl module itself doesn't have server-side support for SNI,
        # so we rely on a third-party test site.
        expect_sni = ssl.HAS_SNI
        with support.transient_internet("XXX"):
            u = urllib.request.urlopen("XXX")
            contents = u.readall()
            if expect_sni:
                self.assertIn(b"Great", contents)
                self.assertNotIn(b"Unfortunately", contents)
            else:
                self.assertNotIn(b"Great", contents)
                self.assertIn(b"Unfortunately", contents)
client.py 文件源码 项目:packaging 作者: blockstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
client.py 文件源码 项目:islam-buddy 作者: hamir 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
test_urllib2net.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_sni(self):
        self.skipTest("test disabled - test server needed")
        # Checks that Server Name Indication works, if supported by the
        # OpenSSL linked to.
        # The ssl module itself doesn't have server-side support for SNI,
        # so we rely on a third-party test site.
        expect_sni = ssl.HAS_SNI
        with support.transient_internet("XXX"):
            u = urllib.request.urlopen("XXX")
            contents = u.readall()
            if expect_sni:
                self.assertIn(b"Great", contents)
                self.assertNotIn(b"Unfortunately", contents)
            else:
                self.assertNotIn(b"Great", contents)
                self.assertIn(b"Unfortunately", contents)
client.py 文件源码 项目:FightstickDisplay 作者: calexil 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket.create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
client.py 文件源码 项目:cryptogram 作者: xinmingzhang 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket.create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
client.py 文件源码 项目:Repobot 作者: Desgard 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
client.py 文件源码 项目:UMOG 作者: hsab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
client.py 文件源码 项目:blackmamba 作者: zrzka 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
imaplib.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def starttls(self, ssl_context=None):
        name = 'STARTTLS'
        if not HAVE_SSL:
            raise self.error('SSL support missing')
        if self._tls_established:
            raise self.abort('TLS session already established')
        if name not in self.capabilities:
            raise self.abort('TLS not supported by server')
        # Generate a default SSL context if none was passed.
        if ssl_context is None:
            ssl_context = ssl._create_stdlib_context()
        typ, dat = self._simple_command(name)
        if typ == 'OK':
            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = ssl_context.wrap_socket(self.sock,
                                                server_hostname=server_hostname)
            self.file = self.sock.makefile('rb')
            self._tls_established = True
            self._get_capabilities()
        else:
            raise self.error("Couldn't establish TLS session")
        return self._untagged_response(typ, dat, name)
client.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def connect(self):
            "Connect to a host on a given (SSL) port."

            super().connect()

            if self._tunnel_host:
                server_hostname = self._tunnel_host
            else:
                server_hostname = self.host
            sni_hostname = server_hostname if ssl.HAS_SNI else None

            self.sock = self._context.wrap_socket(self.sock,
                                                  server_hostname=sni_hostname)
            if not self._context.check_hostname and self._check_hostname:
                try:
                    ssl.match_hostname(self.sock.getpeercert(), server_hostname)
                except Exception:
                    self.sock.shutdown(socket.SHUT_RDWR)
                    self.sock.close()
                    raise
poplib.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def stls(self, context=None):
        """Start a TLS session on the active connection as specified in RFC 2595.

                context - a ssl.SSLContext
        """
        if not HAVE_SSL:
            raise error_proto('-ERR TLS support missing')
        if self._tls_established:
            raise error_proto('-ERR TLS session already established')
        caps = self.capa()
        if not 'STLS' in caps:
            raise error_proto('-ERR STLS not supported by server')
        if context is None:
            context = ssl._create_stdlib_context()
        resp = self._shortcmd('STLS')
        server_hostname = self.host if ssl.HAS_SNI else None
        self.sock = context.wrap_socket(self.sock,
                                        server_hostname=server_hostname)
        self.file = self.sock.makefile('rb')
        self._tls_established = True
        return resp
client.py 文件源码 项目:beepboop 作者: nicolehe 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
client.py 文件源码 项目:hackathon 作者: vertica 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
client.py 文件源码 项目:yatta_reader 作者: sound88 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def connect(self):
            "Connect to a host on a given (SSL) port."

            sock = socket_create_connection((self.host, self.port),
                                            self.timeout, self.source_address)

            if self._tunnel_host:
                self.sock = sock
                self._tunnel()

            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self._context.wrap_socket(sock,
                                                  server_hostname=server_hostname)
            try:
                if self._check_hostname:
                    ssl.match_hostname(self.sock.getpeercert(), self.host)
            except Exception:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
test_urllib2_localnet.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_https_sni(self):
        if ssl is None:
            self.skipTest("ssl module required")
        if not ssl.HAS_SNI:
            self.skipTest("SNI support required in OpenSSL")
        sni_name = [None]
        def cb_sni(ssl_sock, server_name, initial_context):
            sni_name[0] = server_name
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.set_servername_callback(cb_sni)
        handler = self.start_https_server(context=context, certfile=CERT_localhost)
        context = ssl.create_default_context(cafile=CERT_localhost)
        self.urlopen("https://localhost:%s" % handler.port, context=context)
        self.assertEqual(sni_name[0], "localhost")
test_urllib2_localnet.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_https_sni(self):
        if ssl is None:
            self.skipTest("ssl module required")
        if not ssl.HAS_SNI:
            self.skipTest("SNI support required in OpenSSL")
        sni_name = [None]
        def cb_sni(ssl_sock, server_name, initial_context):
            sni_name[0] = server_name
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.set_servername_callback(cb_sni)
        handler = self.start_https_server(context=context, certfile=CERT_localhost)
        context = ssl.create_default_context(cafile=CERT_localhost)
        self.urlopen("https://localhost:%s" % handler.port, context=context)
        self.assertEqual(sni_name[0], "localhost")
test_urllib2_localnet.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_https_sni(self):
        if ssl is None:
            self.skipTest("ssl module required")
        if not ssl.HAS_SNI:
            self.skipTest("SNI support required in OpenSSL")
        sni_name = [None]
        def cb_sni(ssl_sock, server_name, initial_context):
            sni_name[0] = server_name
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.set_servername_callback(cb_sni)
        handler = self.start_https_server(context=context, certfile=CERT_localhost)
        context = ssl.create_default_context(cafile=CERT_localhost)
        self.urlopen("https://localhost:%s" % handler.port, context=context)
        self.assertEqual(sni_name[0], "localhost")
test_urllib2_localnet.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_https_sni(self):
        if ssl is None:
            self.skipTest("ssl module required")
        if not ssl.HAS_SNI:
            self.skipTest("SNI support required in OpenSSL")
        sni_name = None
        def cb_sni(ssl_sock, server_name, initial_context):
            nonlocal sni_name
            sni_name = server_name
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.set_servername_callback(cb_sni)
        handler = self.start_https_server(context=context, certfile=CERT_localhost)
        context = ssl.create_default_context(cafile=CERT_localhost)
        self.urlopen("https://localhost:%s" % handler.port, context=context)
        self.assertEqual(sni_name, "localhost")
imaplib.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _create_socket(self):
            sock = IMAP4._create_socket(self)
            server_hostname = self.host if ssl.HAS_SNI else None
            return self.ssl_context.wrap_socket(sock,
                                                server_hostname=server_hostname)
smtplib.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _get_socket(self, host, port, timeout):
            if self.debuglevel > 0:
                print('connect:', (host, port), file=stderr)
            new_socket = socket.create_connection((host, port), timeout,
                    self.source_address)
            server_hostname = self._host if ssl.HAS_SNI else None
            new_socket = self.context.wrap_socket(new_socket,
                                                  server_hostname=server_hostname)
            return new_socket
ftplib.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def auth(self):
            '''Set up secure control connection by using TLS/SSL.'''
            if isinstance(self.sock, ssl.SSLSocket):
                raise ValueError("Already using TLS")
            if self.ssl_version == ssl.PROTOCOL_TLSv1:
                resp = self.voidcmd('AUTH TLS')
            else:
                resp = self.voidcmd('AUTH SSL')
            server_hostname = self.host if ssl.HAS_SNI else None
            self.sock = self.context.wrap_socket(self.sock,
                                                 server_hostname=server_hostname)
            self.file = self.sock.makefile(mode='r', encoding=self.encoding)
            return resp
ftplib.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def ntransfercmd(self, cmd, rest=None):
            conn, size = FTP.ntransfercmd(self, cmd, rest)
            if self._prot_p:
                server_hostname = self.host if ssl.HAS_SNI else None
                conn = self.context.wrap_socket(conn,
                                                server_hostname=server_hostname)
            return conn, size
test_urllib2_localnet.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_https_sni(self):
        if ssl is None:
            self.skipTest("ssl module required")
        if not ssl.HAS_SNI:
            self.skipTest("SNI support required in OpenSSL")
        sni_name = None
        def cb_sni(ssl_sock, server_name, initial_context):
            nonlocal sni_name
            sni_name = server_name
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.set_servername_callback(cb_sni)
        handler = self.start_https_server(context=context, certfile=CERT_localhost)
        self.urlopen("https://localhost:%s" % handler.port)
        self.assertEqual(sni_name, "localhost")
libirc.py 文件源码 项目:orizonhub 作者: gumblex 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def connect(self, addr=('irc.freenode.net', 6667), use_ssl=False):
        '''Connect to a IRC server. addr is a tuple of (server, port)'''
        self.acquire_lock()
        self.addr = (rmnlsp(addr[0]), addr[1])

        for res in socket.getaddrinfo(self.addr[0], self.addr[1], socket.AF_UNSPEC, socket.SOCK_STREAM):
            af, socktype, proto, canonname, sa = res
            try:
                if use_ssl:
                    if (3,) <= sys.version_info < (3, 3):
                        self.sock = ssl.SSLSocket(af, socktype, proto)
                    elif sys.version_info >= (3, 4):
                        ctx = ssl.create_default_context()
                        if ssl.HAS_SNI:
                            self.sock = ctx.wrap_socket(socket.socket(af, socktype, proto), server_hostname=self.addr[0])
                        else:
                            self.sock = ctx.wrap_socket(socket.socket(af, socktype, proto))
                    else:
                        self.sock = ssl.SSLSocket(sock=socket.socket(af, socktype, proto))
                else:
                    self.sock = socket.socket(af, socktype, proto)
            except socket.error:
                self.sock = None
                continue
            try:
                self.sock.settimeout(300)
                self.sock.connect(sa)
            except socket.error:
                self.sock.close()
                self.sock = None
                continue
            break

        if self.sock is None:
            e = socket.error(
                '[errno %d] Socket operation on non-socket' % errno.ENOTSOCK)
            e.errno = errno.ENOTSOCK
            self.lock.release()
            raise e

        self.nick = None
        self.recvbuf = b''
        self.sendbuf = b''
        self.lock.release()
smtplib.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def starttls(self, keyfile=None, certfile=None, context=None):
        """Puts the connection to the SMTP server into TLS mode.

        If there has been no previous EHLO or HELO command this session, this
        method tries ESMTP EHLO first.

        If the server supports TLS, this will encrypt the rest of the SMTP
        session. If you provide the keyfile and certfile parameters,
        the identity of the SMTP server and client can be checked. This,
        however, depends on whether the socket module really checks the
        certificates.

        This method may raise the following exceptions:

         SMTPHeloError            The server didn't reply properly to
                                  the helo greeting.
        """
        self.ehlo_or_helo_if_needed()
        if not self.has_extn("starttls"):
            raise SMTPException("STARTTLS extension not supported by server.")
        (resp, reply) = self.docmd("STARTTLS")
        if resp == 220:
            if not _have_ssl:
                raise RuntimeError("No SSL support included in this Python")
            if context is not None and keyfile is not None:
                raise ValueError("context and keyfile arguments are mutually "
                                 "exclusive")
            if context is not None and certfile is not None:
                raise ValueError("context and certfile arguments are mutually "
                                 "exclusive")
            if context is None:
                context = ssl._create_stdlib_context(certfile=certfile,
                                                     keyfile=keyfile)
            server_hostname = self._host if ssl.HAS_SNI else None
            self.sock = context.wrap_socket(self.sock,
                                            server_hostname=server_hostname)
            self.file = None
            # RFC 3207:
            # The client MUST discard any knowledge obtained from
            # the server, such as the list of SMTP service extensions,
            # which was not obtained from the TLS negotiation itself.
            self.helo_resp = None
            self.ehlo_resp = None
            self.esmtp_features = {}
            self.does_esmtp = 0
        return (resp, reply)
selector_events.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
                 server_side=False, server_hostname=None,
                 extra=None, server=None):
        if ssl is None:
            raise RuntimeError('stdlib ssl module not available')

        if server_side:
            if not sslcontext:
                raise ValueError('Server side ssl needs a valid SSLContext')
        else:
            if not sslcontext:
                # Client side may pass ssl=True to use a default
                # context; in that case the sslcontext passed is None.
                # The default is the same as used by urllib with
                # cadefault=True.
                if hasattr(ssl, '_create_stdlib_context'):
                    sslcontext = ssl._create_stdlib_context(
                        cert_reqs=ssl.CERT_REQUIRED,
                        check_hostname=bool(server_hostname))
                else:
                    # Fallback for Python 3.3.
                    sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                    sslcontext.options |= ssl.OP_NO_SSLv2
                    sslcontext.set_default_verify_paths()
                    sslcontext.verify_mode = ssl.CERT_REQUIRED

        wrap_kwargs = {
            'server_side': server_side,
            'do_handshake_on_connect': False,
        }
        if server_hostname and not server_side and ssl.HAS_SNI:
            wrap_kwargs['server_hostname'] = server_hostname
        sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)

        super().__init__(loop, sslsock, protocol, extra, server)

        self._server_hostname = server_hostname
        self._waiter = waiter
        self._sslcontext = sslcontext
        self._paused = False

        # SSL-specific extra info.  (peercert is set later)
        self._extra.update(sslcontext=sslcontext)

        if self._loop.get_debug():
            logger.debug("%r starts SSL handshake", self)
            start_time = self._loop.time()
        else:
            start_time = None
        self._on_handshake(start_time)
wsgiprox.py 文件源码 项目:wsgiprox 作者: webrecorder 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, wsgi,
                 prefix_resolver=None,
                 download_host=None,
                 proxy_host=None,
                 proxy_options=None,
                 proxy_apps=None):

        self._wsgi = wsgi
        self.set_connection_class()

        if isinstance(prefix_resolver, str):
            prefix_resolver = FixedResolver(prefix_resolver)

        self.prefix_resolver = prefix_resolver or FixedResolver()

        self.proxy_apps = proxy_apps or {}

        self.proxy_host = proxy_host or self.DEFAULT_HOST

        #self.has_sni = hasattr(ssl, 'HAS_SNI') and ssl.HAS_SNI
        #self.has_alpn = hasattr(ssl, 'HAS_ALPN') and ssl.HAS_ALPN

        if self.proxy_host not in self.proxy_apps:
            self.proxy_apps[self.proxy_host] = None

        # HTTPS Only Options
        proxy_options = proxy_options or {}

        ca_name = proxy_options.get('ca_name', self.CA_ROOT_NAME)

        ca_file_cache = proxy_options.get('ca_file_cache', self.CA_ROOT_FILE)

        self.ca = CertificateAuthority(ca_name=ca_name,
                                       ca_file_cache=ca_file_cache,
                                       cert_cache=None,
                                       cert_not_before=-3600)

        try:
            self.root_ca_file = self.ca.get_root_pem_filename()
        except Exception as e:
            self.root_ca_file = None

        self.use_wildcard = proxy_options.get('use_wildcard_certs', True)

        if proxy_options.get('enable_cert_download', True):
            download_host = download_host or self.DEFAULT_HOST
            self.proxy_apps[download_host] = CertDownloader(self.ca)

        self.enable_ws = proxy_options.get('enable_websockets', True)
        if WebSocketHandler == object:
            self.enable_ws = None


问题


面经


文章

微信
公众号

扫码关注公众号