python类OP_NO_SSLv2()的实例源码

util.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def connect(self):
        sock = socket.create_connection((self.host, self.port), self.timeout)
        if getattr(self, '_tunnel_host', False):
            self.sock = sock
            self._tunnel()

        if not hasattr(ssl, 'SSLContext'):
            # For 2.x
            if self.ca_certs:
                cert_reqs = ssl.CERT_REQUIRED
            else:
                cert_reqs = ssl.CERT_NONE
            self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                        cert_reqs=cert_reqs,
                                        ssl_version=ssl.PROTOCOL_SSLv23,
                                        ca_certs=self.ca_certs)
        else:
            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
            context.options |= ssl.OP_NO_SSLv2
            if self.cert_file:
                context.load_cert_chain(self.cert_file, self.key_file)
            kwargs = {}
            if self.ca_certs:
                context.verify_mode = ssl.CERT_REQUIRED
                context.load_verify_locations(cafile=self.ca_certs)
                if getattr(ssl, 'HAS_SNI', False):
                    kwargs['server_hostname'] = self.host
            self.sock = context.wrap_socket(sock, **kwargs)
        if self.ca_certs and self.check_domain:
            try:
                match_hostname(self.sock.getpeercert(), self.host)
                logger.debug('Host verified: %s', self.host)
            except CertificateError:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
util.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect(self):
        sock = socket.create_connection((self.host, self.port), self.timeout)
        if getattr(self, '_tunnel_host', False):
            self.sock = sock
            self._tunnel()

        if not hasattr(ssl, 'SSLContext'):
            # For 2.x
            if self.ca_certs:
                cert_reqs = ssl.CERT_REQUIRED
            else:
                cert_reqs = ssl.CERT_NONE
            self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                        cert_reqs=cert_reqs,
                                        ssl_version=ssl.PROTOCOL_SSLv23,
                                        ca_certs=self.ca_certs)
        else:
            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
            context.options |= ssl.OP_NO_SSLv2
            if self.cert_file:
                context.load_cert_chain(self.cert_file, self.key_file)
            kwargs = {}
            if self.ca_certs:
                context.verify_mode = ssl.CERT_REQUIRED
                context.load_verify_locations(cafile=self.ca_certs)
                if getattr(ssl, 'HAS_SNI', False):
                    kwargs['server_hostname'] = self.host
            self.sock = context.wrap_socket(sock, **kwargs)
        if self.ca_certs and self.check_domain:
            try:
                match_hostname(self.sock.getpeercert(), self.host)
                logger.debug('Host verified: %s', self.host)
            except CertificateError:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
util.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def connect(self):
        sock = socket.create_connection((self.host, self.port), self.timeout)
        if getattr(self, '_tunnel_host', False):
            self.sock = sock
            self._tunnel()

        if not hasattr(ssl, 'SSLContext'):
            # For 2.x
            if self.ca_certs:
                cert_reqs = ssl.CERT_REQUIRED
            else:
                cert_reqs = ssl.CERT_NONE
            self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                        cert_reqs=cert_reqs,
                                        ssl_version=ssl.PROTOCOL_SSLv23,
                                        ca_certs=self.ca_certs)
        else:
            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
            context.options |= ssl.OP_NO_SSLv2
            if self.cert_file:
                context.load_cert_chain(self.cert_file, self.key_file)
            kwargs = {}
            if self.ca_certs:
                context.verify_mode = ssl.CERT_REQUIRED
                context.load_verify_locations(cafile=self.ca_certs)
                if getattr(ssl, 'HAS_SNI', False):
                    kwargs['server_hostname'] = self.host
            self.sock = context.wrap_socket(sock, **kwargs)
        if self.ca_certs and self.check_domain:
            try:
                match_hostname(self.sock.getpeercert(), self.host)
                logger.debug('Host verified: %s', self.host)
            except CertificateError:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
util.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def connect(self):
            sock = socket.create_connection((self.host, self.port), self.timeout)
            if getattr(self, '_tunnel_host', False):
                self.sock = sock
                self._tunnel()

            if not hasattr(ssl, 'SSLContext'):
                # For 2.x
                if self.ca_certs:
                    cert_reqs = ssl.CERT_REQUIRED
                else:
                    cert_reqs = ssl.CERT_NONE
                self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                            cert_reqs=cert_reqs,
                                            ssl_version=ssl.PROTOCOL_SSLv23,
                                            ca_certs=self.ca_certs)
            else:  # pragma: no cover
                context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                context.options |= ssl.OP_NO_SSLv2
                if self.cert_file:
                    context.load_cert_chain(self.cert_file, self.key_file)
                kwargs = {}
                if self.ca_certs:
                    context.verify_mode = ssl.CERT_REQUIRED
                    context.load_verify_locations(cafile=self.ca_certs)
                    if getattr(ssl, 'HAS_SNI', False):
                        kwargs['server_hostname'] = self.host
                self.sock = context.wrap_socket(sock, **kwargs)
            if self.ca_certs and self.check_domain:
                try:
                    match_hostname(self.sock.getpeercert(), self.host)
                    logger.debug('Host verified: %s', self.host)
                except CertificateError:  # pragma: no cover
                    self.sock.shutdown(socket.SHUT_RDWR)
                    self.sock.close()
                    raise
util.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def connect(self):
            sock = socket.create_connection((self.host, self.port), self.timeout)
            if getattr(self, '_tunnel_host', False):
                self.sock = sock
                self._tunnel()

            if not hasattr(ssl, 'SSLContext'):
                # For 2.x
                if self.ca_certs:
                    cert_reqs = ssl.CERT_REQUIRED
                else:
                    cert_reqs = ssl.CERT_NONE
                self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                            cert_reqs=cert_reqs,
                                            ssl_version=ssl.PROTOCOL_SSLv23,
                                            ca_certs=self.ca_certs)
            else:  # pragma: no cover
                context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                context.options |= ssl.OP_NO_SSLv2
                if self.cert_file:
                    context.load_cert_chain(self.cert_file, self.key_file)
                kwargs = {}
                if self.ca_certs:
                    context.verify_mode = ssl.CERT_REQUIRED
                    context.load_verify_locations(cafile=self.ca_certs)
                    if getattr(ssl, 'HAS_SNI', False):
                        kwargs['server_hostname'] = self.host
                self.sock = context.wrap_socket(sock, **kwargs)
            if self.ca_certs and self.check_domain:
                try:
                    match_hostname(self.sock.getpeercert(), self.host)
                    logger.debug('Host verified: %s', self.host)
                except CertificateError:  # pragma: no cover
                    self.sock.shutdown(socket.SHUT_RDWR)
                    self.sock.close()
                    raise
cache_svr.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def main():
    asyncio.set_event_loop(None)
    if args.iocp:
        from asyncio.windows_events import ProactorEventLoop
        loop = ProactorEventLoop()
    else:
        loop = asyncio.new_event_loop()
    sslctx = None
    if args.tls:
        import ssl

        # TODO: take cert/key from args as well.
        #   - ????, ??????
        #
        # here = os.path.join(os.path.dirname(__file__), '..', 'tests')
        here = os.path.join(os.path.dirname(__file__), '..', '..', 'tests')
        sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
        sslctx.options |= ssl.OP_NO_SSLv2
        sslctx.load_cert_chain(
            certfile=os.path.join(here, 'ssl_cert.pem'),
            keyfile=os.path.join(here, 'ssl_key.pem'))
    cache = Cache(loop)
    task = asyncio.streams.start_server(cache.handle_client,
                                        args.host, args.port,
                                        ssl=sslctx, loop=loop)
    svr = loop.run_until_complete(task)
    for sock in svr.sockets:
        logging.info('socket %s', sock.getsockname())
    try:
        loop.run_forever()
    finally:
        loop.close()
test_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _create_ssl_context(self, certfile, keyfile=None):
        sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
        sslcontext.options |= ssl.OP_NO_SSLv2
        sslcontext.load_cert_chain(certfile, keyfile)
        return sslcontext
test_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_create_server_ssl_verify_failed(self):
        proto = MyProto(loop=self.loop)
        server, host, port = self._make_ssl_server(
            lambda: proto, SIGNED_CERTFILE)

        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
        sslcontext_client.options |= ssl.OP_NO_SSLv2
        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
        if hasattr(sslcontext_client, 'check_hostname'):
            sslcontext_client.check_hostname = True


        # no CA loaded
        f_c = self.loop.create_connection(MyProto, host, port,
                                          ssl=sslcontext_client)
        with mock.patch.object(self.loop, 'call_exception_handler'):
            with test_utils.disable_logger():
                with self.assertRaisesRegex(ssl.SSLError,
                                            'certificate verify failed '):
                    self.loop.run_until_complete(f_c)

            # execute the loop to log the connection error
            test_utils.run_briefly(self.loop)

        # close connection
        self.assertIsNone(proto.transport)
        server.close()
test_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_create_server_ssl_match_failed(self):
        proto = MyProto(loop=self.loop)
        server, host, port = self._make_ssl_server(
            lambda: proto, SIGNED_CERTFILE)

        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
        sslcontext_client.options |= ssl.OP_NO_SSLv2
        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
        sslcontext_client.load_verify_locations(
            cafile=SIGNING_CA)
        if hasattr(sslcontext_client, 'check_hostname'):
            sslcontext_client.check_hostname = True

        # incorrect server_hostname
        f_c = self.loop.create_connection(MyProto, host, port,
                                          ssl=sslcontext_client)
        with mock.patch.object(self.loop, 'call_exception_handler'):
            with test_utils.disable_logger():
                with self.assertRaisesRegex(
                        ssl.CertificateError,
                        "hostname '127.0.0.1' doesn't match 'localhost'"):
                    self.loop.run_until_complete(f_c)

        # close connection
        proto.transport.close()
        server.close()
util.py 文件源码 项目:ascii-art-py 作者: blinglnav 项目源码 文件源码 阅读 85 收藏 0 点赞 0 评论 0
def connect(self):
            sock = socket.create_connection((self.host, self.port), self.timeout)
            if getattr(self, '_tunnel_host', False):
                self.sock = sock
                self._tunnel()

            if not hasattr(ssl, 'SSLContext'):
                # For 2.x
                if self.ca_certs:
                    cert_reqs = ssl.CERT_REQUIRED
                else:
                    cert_reqs = ssl.CERT_NONE
                self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                            cert_reqs=cert_reqs,
                                            ssl_version=ssl.PROTOCOL_SSLv23,
                                            ca_certs=self.ca_certs)
            else:  # pragma: no cover
                context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                context.options |= ssl.OP_NO_SSLv2
                if self.cert_file:
                    context.load_cert_chain(self.cert_file, self.key_file)
                kwargs = {}
                if self.ca_certs:
                    context.verify_mode = ssl.CERT_REQUIRED
                    context.load_verify_locations(cafile=self.ca_certs)
                    if getattr(ssl, 'HAS_SNI', False):
                        kwargs['server_hostname'] = self.host
                self.sock = context.wrap_socket(sock, **kwargs)
            if self.ca_certs and self.check_domain:
                try:
                    match_hostname(self.sock.getpeercert(), self.host)
                    logger.debug('Host verified: %s', self.host)
                except CertificateError:  # pragma: no cover
                    self.sock.shutdown(socket.SHUT_RDWR)
                    self.sock.close()
                    raise
util.py 文件源码 项目:ivaochdoc 作者: ivaoch 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def connect(self):
            sock = socket.create_connection((self.host, self.port), self.timeout)
            if getattr(self, '_tunnel_host', False):
                self.sock = sock
                self._tunnel()

            if not hasattr(ssl, 'SSLContext'):
                # For 2.x
                if self.ca_certs:
                    cert_reqs = ssl.CERT_REQUIRED
                else:
                    cert_reqs = ssl.CERT_NONE
                self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                            cert_reqs=cert_reqs,
                                            ssl_version=ssl.PROTOCOL_SSLv23,
                                            ca_certs=self.ca_certs)
            else:  # pragma: no cover
                context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                context.options |= ssl.OP_NO_SSLv2
                if self.cert_file:
                    context.load_cert_chain(self.cert_file, self.key_file)
                kwargs = {}
                if self.ca_certs:
                    context.verify_mode = ssl.CERT_REQUIRED
                    context.load_verify_locations(cafile=self.ca_certs)
                    if getattr(ssl, 'HAS_SNI', False):
                        kwargs['server_hostname'] = self.host
                self.sock = context.wrap_socket(sock, **kwargs)
            if self.ca_certs and self.check_domain:
                try:
                    match_hostname(self.sock.getpeercert(), self.host)
                    logger.debug('Host verified: %s', self.host)
                except CertificateError:  # pragma: no cover
                    self.sock.shutdown(socket.SHUT_RDWR)
                    self.sock.close()
                    raise
util.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def connect(self):
            sock = socket.create_connection((self.host, self.port), self.timeout)
            if getattr(self, '_tunnel_host', False):
                self.sock = sock
                self._tunnel()

            if not hasattr(ssl, 'SSLContext'):
                # For 2.x
                if self.ca_certs:
                    cert_reqs = ssl.CERT_REQUIRED
                else:
                    cert_reqs = ssl.CERT_NONE
                self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                            cert_reqs=cert_reqs,
                                            ssl_version=ssl.PROTOCOL_SSLv23,
                                            ca_certs=self.ca_certs)
            else:  # pragma: no cover
                context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                context.options |= ssl.OP_NO_SSLv2
                if self.cert_file:
                    context.load_cert_chain(self.cert_file, self.key_file)
                kwargs = {}
                if self.ca_certs:
                    context.verify_mode = ssl.CERT_REQUIRED
                    context.load_verify_locations(cafile=self.ca_certs)
                    if getattr(ssl, 'HAS_SNI', False):
                        kwargs['server_hostname'] = self.host
                self.sock = context.wrap_socket(sock, **kwargs)
            if self.ca_certs and self.check_domain:
                try:
                    match_hostname(self.sock.getpeercert(), self.host)
                    logger.debug('Host verified: %s', self.host)
                except CertificateError:  # pragma: no cover
                    self.sock.shutdown(socket.SHUT_RDWR)
                    self.sock.close()
                    raise
imcdriver.py 文件源码 项目:imcsdk 作者: CiscoUcs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def connect(self):
        """Overrides HTTPSConnection.connect to specify TLS version"""
        # Standard implementation from HTTPSConnection, which is not
        # designed for extension, unfortunately
        if sys.version_info >= (2, 7):
            sock = socket.create_connection((self.host, self.port),
                                            self.timeout, self.source_address)
        elif sys.version_info >= (2, 6):
            sock = socket.create_connection((self.host, self.port),
                                            self.timeout)
        else:
            sock = socket.create_connection((self.host, self.port))

        if getattr(self, '_tunnel_host', None):
            self.sock = sock
            self._tunnel()

        if hasattr(ssl, 'SSLContext'):
            # Since python 2.7.9, tls 1.1 and 1.2 are supported via
            # SSLContext
            ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
            ssl_context.options |= ssl.OP_NO_SSLv2
            ssl_context.options |= ssl.OP_NO_SSLv3
            if self.key_file and self.cert_file:
                ssl_context.load_cert_chain(keyfile=self.key_file,
                                            certfile=self.cert_file)
            self.sock = ssl_context.wrap_socket(sock)
        else:
            # This is the only difference; default wrap_socket uses SSLv23
            self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                        ssl_version=ssl.PROTOCOL_TLSv1)
util.py 文件源码 项目:django 作者: alexsukhrin 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect(self):
            sock = socket.create_connection((self.host, self.port), self.timeout)
            if getattr(self, '_tunnel_host', False):
                self.sock = sock
                self._tunnel()

            if not hasattr(ssl, 'SSLContext'):
                # For 2.x
                if self.ca_certs:
                    cert_reqs = ssl.CERT_REQUIRED
                else:
                    cert_reqs = ssl.CERT_NONE
                self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                            cert_reqs=cert_reqs,
                                            ssl_version=ssl.PROTOCOL_SSLv23,
                                            ca_certs=self.ca_certs)
            else:  # pragma: no cover
                context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                context.options |= ssl.OP_NO_SSLv2
                if self.cert_file:
                    context.load_cert_chain(self.cert_file, self.key_file)
                kwargs = {}
                if self.ca_certs:
                    context.verify_mode = ssl.CERT_REQUIRED
                    context.load_verify_locations(cafile=self.ca_certs)
                    if getattr(ssl, 'HAS_SNI', False):
                        kwargs['server_hostname'] = self.host
                self.sock = context.wrap_socket(sock, **kwargs)
            if self.ca_certs and self.check_domain:
                try:
                    match_hostname(self.sock.getpeercert(), self.host)
                    logger.debug('Host verified: %s', self.host)
                except CertificateError:  # pragma: no cover
                    self.sock.shutdown(socket.SHUT_RDWR)
                    self.sock.close()
                    raise
util.py 文件源码 项目:RPoint 作者: george17-meet 项目源码 文件源码 阅读 103 收藏 0 点赞 0 评论 0
def connect(self):
            sock = socket.create_connection((self.host, self.port), self.timeout)
            if getattr(self, '_tunnel_host', False):
                self.sock = sock
                self._tunnel()

            if not hasattr(ssl, 'SSLContext'):
                # For 2.x
                if self.ca_certs:
                    cert_reqs = ssl.CERT_REQUIRED
                else:
                    cert_reqs = ssl.CERT_NONE
                self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                            cert_reqs=cert_reqs,
                                            ssl_version=ssl.PROTOCOL_SSLv23,
                                            ca_certs=self.ca_certs)
            else:  # pragma: no cover
                context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                context.options |= ssl.OP_NO_SSLv2
                if self.cert_file:
                    context.load_cert_chain(self.cert_file, self.key_file)
                kwargs = {}
                if self.ca_certs:
                    context.verify_mode = ssl.CERT_REQUIRED
                    context.load_verify_locations(cafile=self.ca_certs)
                    if getattr(ssl, 'HAS_SNI', False):
                        kwargs['server_hostname'] = self.host
                self.sock = context.wrap_socket(sock, **kwargs)
            if self.ca_certs and self.check_domain:
                try:
                    match_hostname(self.sock.getpeercert(), self.host)
                    logger.debug('Host verified: %s', self.host)
                except CertificateError:  # pragma: no cover
                    self.sock.shutdown(socket.SHUT_RDWR)
                    self.sock.close()
                    raise
util.py 文件源码 项目:isni-reconcile 作者: cmh2166 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def connect(self):
        sock = socket.create_connection((self.host, self.port), self.timeout)
        if getattr(self, '_tunnel_host', False):
            self.sock = sock
            self._tunnel()

        if not hasattr(ssl, 'SSLContext'):
            # For 2.x
            if self.ca_certs:
                cert_reqs = ssl.CERT_REQUIRED
            else:
                cert_reqs = ssl.CERT_NONE
            self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                        cert_reqs=cert_reqs,
                                        ssl_version=ssl.PROTOCOL_SSLv23,
                                        ca_certs=self.ca_certs)
        else:
            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
            context.options |= ssl.OP_NO_SSLv2
            if self.cert_file:
                context.load_cert_chain(self.cert_file, self.key_file)
            kwargs = {}
            if self.ca_certs:
                context.verify_mode = ssl.CERT_REQUIRED
                context.load_verify_locations(cafile=self.ca_certs)
                if getattr(ssl, 'HAS_SNI', False):
                    kwargs['server_hostname'] = self.host
            self.sock = context.wrap_socket(sock, **kwargs)
        if self.ca_certs and self.check_domain:
            try:
                match_hostname(self.sock.getpeercert(), self.host)
                logger.debug('Host verified: %s', self.host)
            except CertificateError:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
util.py 文件源码 项目:AshsSDK 作者: thehappydinoa 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def connect(self):
            sock = socket.create_connection((self.host, self.port), self.timeout)
            if getattr(self, '_tunnel_host', False):
                self.sock = sock
                self._tunnel()

            if not hasattr(ssl, 'SSLContext'):
                # For 2.x
                if self.ca_certs:
                    cert_reqs = ssl.CERT_REQUIRED
                else:
                    cert_reqs = ssl.CERT_NONE
                self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                            cert_reqs=cert_reqs,
                                            ssl_version=ssl.PROTOCOL_SSLv23,
                                            ca_certs=self.ca_certs)
            else:  # pragma: no cover
                context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                context.options |= ssl.OP_NO_SSLv2
                if self.cert_file:
                    context.load_cert_chain(self.cert_file, self.key_file)
                kwargs = {}
                if self.ca_certs:
                    context.verify_mode = ssl.CERT_REQUIRED
                    context.load_verify_locations(cafile=self.ca_certs)
                    if getattr(ssl, 'HAS_SNI', False):
                        kwargs['server_hostname'] = self.host
                self.sock = context.wrap_socket(sock, **kwargs)
            if self.ca_certs and self.check_domain:
                try:
                    match_hostname(self.sock.getpeercert(), self.host)
                    logger.debug('Host verified: %s', self.host)
                except CertificateError:  # pragma: no cover
                    self.sock.shutdown(socket.SHUT_RDWR)
                    self.sock.close()
                    raise
util.py 文件源码 项目:habilitacion 作者: GabrielBD 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def connect(self):
            sock = socket.create_connection((self.host, self.port), self.timeout)
            if getattr(self, '_tunnel_host', False):
                self.sock = sock
                self._tunnel()

            if not hasattr(ssl, 'SSLContext'):
                # For 2.x
                if self.ca_certs:
                    cert_reqs = ssl.CERT_REQUIRED
                else:
                    cert_reqs = ssl.CERT_NONE
                self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                            cert_reqs=cert_reqs,
                                            ssl_version=ssl.PROTOCOL_SSLv23,
                                            ca_certs=self.ca_certs)
            else:  # pragma: no cover
                context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                context.options |= ssl.OP_NO_SSLv2
                if self.cert_file:
                    context.load_cert_chain(self.cert_file, self.key_file)
                kwargs = {}
                if self.ca_certs:
                    context.verify_mode = ssl.CERT_REQUIRED
                    context.load_verify_locations(cafile=self.ca_certs)
                    if getattr(ssl, 'HAS_SNI', False):
                        kwargs['server_hostname'] = self.host
                self.sock = context.wrap_socket(sock, **kwargs)
            if self.ca_certs and self.check_domain:
                try:
                    match_hostname(self.sock.getpeercert(), self.host)
                    logger.debug('Host verified: %s', self.host)
                except CertificateError:  # pragma: no cover
                    self.sock.shutdown(socket.SHUT_RDWR)
                    self.sock.close()
                    raise
util.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def connect(self):
        sock = socket.create_connection((self.host, self.port), self.timeout)
        if getattr(self, '_tunnel_host', False):
            self.sock = sock
            self._tunnel()

        if not hasattr(ssl, 'SSLContext'):
            # For 2.x
            if self.ca_certs:
                cert_reqs = ssl.CERT_REQUIRED
            else:
                cert_reqs = ssl.CERT_NONE
            self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                        cert_reqs=cert_reqs,
                                        ssl_version=ssl.PROTOCOL_SSLv23,
                                        ca_certs=self.ca_certs)
        else:
            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
            context.options |= ssl.OP_NO_SSLv2
            if self.cert_file:
                context.load_cert_chain(self.cert_file, self.key_file)
            kwargs = {}
            if self.ca_certs:
                context.verify_mode = ssl.CERT_REQUIRED
                context.load_verify_locations(cafile=self.ca_certs)
                if getattr(ssl, 'HAS_SNI', False):
                    kwargs['server_hostname'] = self.host
            self.sock = context.wrap_socket(sock, **kwargs)
        if self.ca_certs and self.check_domain:
            try:
                match_hostname(self.sock.getpeercert(), self.host)
                logger.debug('Host verified: %s', self.host)
            except CertificateError:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
util.py 文件源码 项目:flickr_downloader 作者: Denisolt 项目源码 文件源码 阅读 71 收藏 0 点赞 0 评论 0
def connect(self):
            sock = socket.create_connection((self.host, self.port), self.timeout)
            if getattr(self, '_tunnel_host', False):
                self.sock = sock
                self._tunnel()

            if not hasattr(ssl, 'SSLContext'):
                # For 2.x
                if self.ca_certs:
                    cert_reqs = ssl.CERT_REQUIRED
                else:
                    cert_reqs = ssl.CERT_NONE
                self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                            cert_reqs=cert_reqs,
                                            ssl_version=ssl.PROTOCOL_SSLv23,
                                            ca_certs=self.ca_certs)
            else:  # pragma: no cover
                context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                context.options |= ssl.OP_NO_SSLv2
                if self.cert_file:
                    context.load_cert_chain(self.cert_file, self.key_file)
                kwargs = {}
                if self.ca_certs:
                    context.verify_mode = ssl.CERT_REQUIRED
                    context.load_verify_locations(cafile=self.ca_certs)
                    if getattr(ssl, 'HAS_SNI', False):
                        kwargs['server_hostname'] = self.host
                self.sock = context.wrap_socket(sock, **kwargs)
            if self.ca_certs and self.check_domain:
                try:
                    match_hostname(self.sock.getpeercert(), self.host)
                    logger.debug('Host verified: %s', self.host)
                except CertificateError:  # pragma: no cover
                    self.sock.shutdown(socket.SHUT_RDWR)
                    self.sock.close()
                    raise


问题


面经


文章

微信
公众号

扫码关注公众号