python类PROTOCOL_SSLv23()的实例源码

util.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def connect(self):
            sock = socket.create_connection((self.host, self.port), self.timeout)
            if getattr(self, '_tunnel_host', False):
                self.sock = sock
                self._tunnel()

            if not hasattr(ssl, 'SSLContext'):
                # For 2.x
                if self.ca_certs:
                    cert_reqs = ssl.CERT_REQUIRED
                else:
                    cert_reqs = ssl.CERT_NONE
                self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                            cert_reqs=cert_reqs,
                                            ssl_version=ssl.PROTOCOL_SSLv23,
                                            ca_certs=self.ca_certs)
            else:  # pragma: no cover
                context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                context.options |= ssl.OP_NO_SSLv2
                if self.cert_file:
                    context.load_cert_chain(self.cert_file, self.key_file)
                kwargs = {}
                if self.ca_certs:
                    context.verify_mode = ssl.CERT_REQUIRED
                    context.load_verify_locations(cafile=self.ca_certs)
                    if getattr(ssl, 'HAS_SNI', False):
                        kwargs['server_hostname'] = self.host
                self.sock = context.wrap_socket(sock, **kwargs)
            if self.ca_certs and self.check_domain:
                try:
                    match_hostname(self.sock.getpeercert(), self.host)
                    logger.debug('Host verified: %s', self.host)
                except CertificateError:  # pragma: no cover
                    self.sock.shutdown(socket.SHUT_RDWR)
                    self.sock.close()
                    raise
ssl_.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def resolve_ssl_version(candidate):
    """
    like resolve_cert_reqs
    """
    if candidate is None:
        return PROTOCOL_SSLv23

    if isinstance(candidate, str):
        res = getattr(ssl, candidate, None)
        if res is None:
            res = getattr(ssl, 'PROTOCOL_' + candidate)
        return res

    return candidate
ssl_.py 文件源码 项目:aws-waf-security-automation 作者: cerbo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def resolve_ssl_version(candidate):
    """
    like resolve_cert_reqs
    """
    if candidate is None:
        return PROTOCOL_SSLv23

    if isinstance(candidate, str):
        res = getattr(ssl, candidate, None)
        if res is None:
            res = getattr(ssl, 'PROTOCOL_' + candidate)
        return res

    return candidate
util.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def connect(self):
        sock = socket.create_connection((self.host, self.port), self.timeout)
        if getattr(self, '_tunnel_host', False):
            self.sock = sock
            self._tunnel()

        if not hasattr(ssl, 'SSLContext'):
            # For 2.x
            if self.ca_certs:
                cert_reqs = ssl.CERT_REQUIRED
            else:
                cert_reqs = ssl.CERT_NONE
            self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                        cert_reqs=cert_reqs,
                                        ssl_version=ssl.PROTOCOL_SSLv23,
                                        ca_certs=self.ca_certs)
        else:
            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
            context.options |= ssl.OP_NO_SSLv2
            if self.cert_file:
                context.load_cert_chain(self.cert_file, self.key_file)
            kwargs = {}
            if self.ca_certs:
                context.verify_mode = ssl.CERT_REQUIRED
                context.load_verify_locations(cafile=self.ca_certs)
                if getattr(ssl, 'HAS_SNI', False):
                    kwargs['server_hostname'] = self.host
            self.sock = context.wrap_socket(sock, **kwargs)
        if self.ca_certs and self.check_domain:
            try:
                match_hostname(self.sock.getpeercert(), self.host)
                logger.debug('Host verified: %s', self.host)
            except CertificateError:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
ssl_.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def resolve_ssl_version(candidate):
    """
    like resolve_cert_reqs
    """
    if candidate is None:
        return PROTOCOL_SSLv23

    if isinstance(candidate, str):
        res = getattr(ssl, candidate, None)
        if res is None:
            res = getattr(ssl, 'PROTOCOL_' + candidate)
        return res

    return candidate
imaplib2.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def ssl_wrap_socket(self):

        # Allow sending of keep-alive messages - seems to prevent some servers
        # from closing SSL, leading to deadlocks.
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)

        try:
            import ssl
            if self.ca_certs is not None:
                cert_reqs = ssl.CERT_REQUIRED
            else:
                cert_reqs = ssl.CERT_NONE

            if self.ssl_version == "tls1":
                ssl_version = ssl.PROTOCOL_TLSv1
            elif self.ssl_version == "ssl2":
                ssl_version = ssl.PROTOCOL_SSLv2
            elif self.ssl_version == "ssl3":
                ssl_version = ssl.PROTOCOL_SSLv3
            elif self.ssl_version == "ssl23" or self.ssl_version is None:
                ssl_version = ssl.PROTOCOL_SSLv23
            else:
                raise socket.sslerror("Invalid SSL version requested: %s", self.ssl_version)

            self.sock = ssl.wrap_socket(self.sock, self.keyfile, self.certfile, ca_certs=self.ca_certs, cert_reqs=cert_reqs, ssl_version=ssl_version)
            ssl_exc = ssl.SSLError
            self.read_fd = self.sock.fileno()
        except ImportError:
            # No ssl module, and socket.ssl has no fileno(), and does not allow certificate verification
            raise socket.sslerror("imaplib2 SSL mode does not work without ssl module")

        if self.cert_verify_cb is not None:
            cert_err = self.cert_verify_cb(self.sock.getpeercert(), self.host)
            if cert_err:
                raise ssl_exc(cert_err)
util.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def connect(self):
        sock = socket.create_connection((self.host, self.port), self.timeout)
        if getattr(self, '_tunnel_host', False):
            self.sock = sock
            self._tunnel()

        if not hasattr(ssl, 'SSLContext'):
            # For 2.x
            if self.ca_certs:
                cert_reqs = ssl.CERT_REQUIRED
            else:
                cert_reqs = ssl.CERT_NONE
            self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                        cert_reqs=cert_reqs,
                                        ssl_version=ssl.PROTOCOL_SSLv23,
                                        ca_certs=self.ca_certs)
        else:
            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
            context.options |= ssl.OP_NO_SSLv2
            if self.cert_file:
                context.load_cert_chain(self.cert_file, self.key_file)
            kwargs = {}
            if self.ca_certs:
                context.verify_mode = ssl.CERT_REQUIRED
                context.load_verify_locations(cafile=self.ca_certs)
                if getattr(ssl, 'HAS_SNI', False):
                    kwargs['server_hostname'] = self.host
            self.sock = context.wrap_socket(sock, **kwargs)
        if self.ca_certs and self.check_domain:
            try:
                match_hostname(self.sock.getpeercert(), self.host)
                logger.debug('Host verified: %s', self.host)
            except CertificateError:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
ssl_.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def resolve_ssl_version(candidate):
    """
    like resolve_cert_reqs
    """
    if candidate is None:
        return PROTOCOL_SSLv23

    if isinstance(candidate, str):
        res = getattr(ssl, candidate, None)
        if res is None:
            res = getattr(ssl, 'PROTOCOL_' + candidate)
        return res

    return candidate
netutil.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def ssl_options_to_context(ssl_options):
    """Try to convert an ``ssl_options`` dictionary to an
    `~ssl.SSLContext` object.

    The ``ssl_options`` dictionary contains keywords to be passed to
    `ssl.wrap_socket`.  In Python 2.7.9+, `ssl.SSLContext` objects can
    be used instead.  This function converts the dict form to its
    `~ssl.SSLContext` equivalent, and may be used when a component which
    accepts both forms needs to upgrade to the `~ssl.SSLContext` version
    to use features like SNI or NPN.
    """
    if isinstance(ssl_options, dict):
        assert all(k in _SSL_CONTEXT_KEYWORDS for k in ssl_options), ssl_options
    if (not hasattr(ssl, 'SSLContext') or
            isinstance(ssl_options, ssl.SSLContext)):
        return ssl_options
    context = ssl.SSLContext(
        ssl_options.get('ssl_version', ssl.PROTOCOL_SSLv23))
    if 'certfile' in ssl_options:
        context.load_cert_chain(ssl_options['certfile'], ssl_options.get('keyfile', None))
    if 'cert_reqs' in ssl_options:
        context.verify_mode = ssl_options['cert_reqs']
    if 'ca_certs' in ssl_options:
        context.load_verify_locations(ssl_options['ca_certs'])
    if 'ciphers' in ssl_options:
        context.set_ciphers(ssl_options['ciphers'])
    if hasattr(ssl, 'OP_NO_COMPRESSION'):
        # Disable TLS compression to avoid CRIME and related attacks.
        # This constant wasn't added until python 3.3.
        context.options |= ssl.OP_NO_COMPRESSION
    return context
ssl_.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def resolve_ssl_version(candidate):
    """
    like resolve_cert_reqs
    """
    if candidate is None:
        return PROTOCOL_SSLv23

    if isinstance(candidate, str):
        res = getattr(ssl, candidate, None)
        if res is None:
            res = getattr(ssl, 'PROTOCOL_' + candidate)
        return res

    return candidate
util.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect(self):
        sock = socket.create_connection((self.host, self.port), self.timeout)
        if getattr(self, '_tunnel_host', False):
            self.sock = sock
            self._tunnel()

        if not hasattr(ssl, 'SSLContext'):
            # For 2.x
            if self.ca_certs:
                cert_reqs = ssl.CERT_REQUIRED
            else:
                cert_reqs = ssl.CERT_NONE
            self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                        cert_reqs=cert_reqs,
                                        ssl_version=ssl.PROTOCOL_SSLv23,
                                        ca_certs=self.ca_certs)
        else:
            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
            context.options |= ssl.OP_NO_SSLv2
            if self.cert_file:
                context.load_cert_chain(self.cert_file, self.key_file)
            kwargs = {}
            if self.ca_certs:
                context.verify_mode = ssl.CERT_REQUIRED
                context.load_verify_locations(cafile=self.ca_certs)
                if getattr(ssl, 'HAS_SNI', False):
                    kwargs['server_hostname'] = self.host
            self.sock = context.wrap_socket(sock, **kwargs)
        if self.ca_certs and self.check_domain:
            try:
                match_hostname(self.sock.getpeercert(), self.host)
                logger.debug('Host verified: %s', self.host)
            except CertificateError:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
ssl_.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def resolve_ssl_version(candidate):
    """
    like resolve_cert_reqs
    """
    if candidate is None:
        return PROTOCOL_SSLv23

    if isinstance(candidate, str):
        res = getattr(ssl, candidate, None)
        if res is None:
            res = getattr(ssl, 'PROTOCOL_' + candidate)
        return res

    return candidate
netutil.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def ssl_options_to_context(ssl_options):
    """Try to convert an ``ssl_options`` dictionary to an
    `~ssl.SSLContext` object.

    The ``ssl_options`` dictionary contains keywords to be passed to
    `ssl.wrap_socket`.  In Python 2.7.9+, `ssl.SSLContext` objects can
    be used instead.  This function converts the dict form to its
    `~ssl.SSLContext` equivalent, and may be used when a component which
    accepts both forms needs to upgrade to the `~ssl.SSLContext` version
    to use features like SNI or NPN.
    """
    if isinstance(ssl_options, dict):
        assert all(k in _SSL_CONTEXT_KEYWORDS for k in ssl_options), ssl_options
    if (not hasattr(ssl, 'SSLContext') or
            isinstance(ssl_options, ssl.SSLContext)):
        return ssl_options
    context = ssl.SSLContext(
        ssl_options.get('ssl_version', ssl.PROTOCOL_SSLv23))
    if 'certfile' in ssl_options:
        context.load_cert_chain(ssl_options['certfile'], ssl_options.get('keyfile', None))
    if 'cert_reqs' in ssl_options:
        context.verify_mode = ssl_options['cert_reqs']
    if 'ca_certs' in ssl_options:
        context.load_verify_locations(ssl_options['ca_certs'])
    if 'ciphers' in ssl_options:
        context.set_ciphers(ssl_options['ciphers'])
    if hasattr(ssl, 'OP_NO_COMPRESSION'):
        # Disable TLS compression to avoid CRIME and related attacks.
        # This constant wasn't added until python 3.3.
        context.options |= ssl.OP_NO_COMPRESSION
    return context
ssl_.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def resolve_ssl_version(candidate):
    """
    like resolve_cert_reqs
    """
    if candidate is None:
        return PROTOCOL_SSLv23

    if isinstance(candidate, str):
        res = getattr(ssl, candidate, None)
        if res is None:
            res = getattr(ssl, 'PROTOCOL_' + candidate)
        return res

    return candidate
util.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect(self):
        sock = socket.create_connection((self.host, self.port), self.timeout)
        if getattr(self, '_tunnel_host', False):
            self.sock = sock
            self._tunnel()

        if not hasattr(ssl, 'SSLContext'):
            # For 2.x
            if self.ca_certs:
                cert_reqs = ssl.CERT_REQUIRED
            else:
                cert_reqs = ssl.CERT_NONE
            self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                        cert_reqs=cert_reqs,
                                        ssl_version=ssl.PROTOCOL_SSLv23,
                                        ca_certs=self.ca_certs)
        else:
            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
            context.options |= ssl.OP_NO_SSLv2
            if self.cert_file:
                context.load_cert_chain(self.cert_file, self.key_file)
            kwargs = {}
            if self.ca_certs:
                context.verify_mode = ssl.CERT_REQUIRED
                context.load_verify_locations(cafile=self.ca_certs)
                if getattr(ssl, 'HAS_SNI', False):
                    kwargs['server_hostname'] = self.host
            self.sock = context.wrap_socket(sock, **kwargs)
        if self.ca_certs and self.check_domain:
            try:
                match_hostname(self.sock.getpeercert(), self.host)
                logger.debug('Host verified: %s', self.host)
            except CertificateError:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
ssl_.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def resolve_ssl_version(candidate):
    """
    like resolve_cert_reqs
    """
    if candidate is None:
        return PROTOCOL_SSLv23

    if isinstance(candidate, str):
        res = getattr(ssl, candidate, None)
        if res is None:
            res = getattr(ssl, 'PROTOCOL_' + candidate)
        return res

    return candidate
netutil.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def ssl_options_to_context(ssl_options):
    """Try to convert an ``ssl_options`` dictionary to an
    `~ssl.SSLContext` object.

    The ``ssl_options`` dictionary contains keywords to be passed to
    `ssl.wrap_socket`.  In Python 2.7.9+, `ssl.SSLContext` objects can
    be used instead.  This function converts the dict form to its
    `~ssl.SSLContext` equivalent, and may be used when a component which
    accepts both forms needs to upgrade to the `~ssl.SSLContext` version
    to use features like SNI or NPN.
    """
    if isinstance(ssl_options, dict):
        assert all(k in _SSL_CONTEXT_KEYWORDS for k in ssl_options), ssl_options
    if (not hasattr(ssl, 'SSLContext') or
            isinstance(ssl_options, ssl.SSLContext)):
        return ssl_options
    context = ssl.SSLContext(
        ssl_options.get('ssl_version', ssl.PROTOCOL_SSLv23))
    if 'certfile' in ssl_options:
        context.load_cert_chain(ssl_options['certfile'], ssl_options.get('keyfile', None))
    if 'cert_reqs' in ssl_options:
        context.verify_mode = ssl_options['cert_reqs']
    if 'ca_certs' in ssl_options:
        context.load_verify_locations(ssl_options['ca_certs'])
    if 'ciphers' in ssl_options:
        context.set_ciphers(ssl_options['ciphers'])
    if hasattr(ssl, 'OP_NO_COMPRESSION'):
        # Disable TLS compression to avoid CRIME and related attacks.
        # This constant wasn't added until python 3.3.
        context.options |= ssl.OP_NO_COMPRESSION
    return context
util.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def connect(self):
        sock = socket.create_connection((self.host, self.port), self.timeout)
        if getattr(self, '_tunnel_host', False):
            self.sock = sock
            self._tunnel()

        if not hasattr(ssl, 'SSLContext'):
            # For 2.x
            if self.ca_certs:
                cert_reqs = ssl.CERT_REQUIRED
            else:
                cert_reqs = ssl.CERT_NONE
            self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                        cert_reqs=cert_reqs,
                                        ssl_version=ssl.PROTOCOL_SSLv23,
                                        ca_certs=self.ca_certs)
        else:
            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
            context.options |= ssl.OP_NO_SSLv2
            if self.cert_file:
                context.load_cert_chain(self.cert_file, self.key_file)
            kwargs = {}
            if self.ca_certs:
                context.verify_mode = ssl.CERT_REQUIRED
                context.load_verify_locations(cafile=self.ca_certs)
                if getattr(ssl, 'HAS_SNI', False):
                    kwargs['server_hostname'] = self.host
            self.sock = context.wrap_socket(sock, **kwargs)
        if self.ca_certs and self.check_domain:
            try:
                match_hostname(self.sock.getpeercert(), self.host)
                logger.debug('Host verified: %s', self.host)
            except CertificateError:
                self.sock.shutdown(socket.SHUT_RDWR)
                self.sock.close()
                raise
ssl_.py 文件源码 项目:jira_worklog_scanner 作者: pgarneau 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def resolve_ssl_version(candidate):
    """
    like resolve_cert_reqs
    """
    if candidate is None:
        return PROTOCOL_SSLv23

    if isinstance(candidate, str):
        res = getattr(ssl, candidate, None)
        if res is None:
            res = getattr(ssl, 'PROTOCOL_' + candidate)
        return res

    return candidate
util.py 文件源码 项目:jira_worklog_scanner 作者: pgarneau 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def connect(self):
            sock = socket.create_connection((self.host, self.port), self.timeout)
            if getattr(self, '_tunnel_host', False):
                self.sock = sock
                self._tunnel()

            if not hasattr(ssl, 'SSLContext'):
                # For 2.x
                if self.ca_certs:
                    cert_reqs = ssl.CERT_REQUIRED
                else:
                    cert_reqs = ssl.CERT_NONE
                self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
                                            cert_reqs=cert_reqs,
                                            ssl_version=ssl.PROTOCOL_SSLv23,
                                            ca_certs=self.ca_certs)
            else:  # pragma: no cover
                context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
                context.options |= ssl.OP_NO_SSLv2
                if self.cert_file:
                    context.load_cert_chain(self.cert_file, self.key_file)
                kwargs = {}
                if self.ca_certs:
                    context.verify_mode = ssl.CERT_REQUIRED
                    context.load_verify_locations(cafile=self.ca_certs)
                    if getattr(ssl, 'HAS_SNI', False):
                        kwargs['server_hostname'] = self.host
                self.sock = context.wrap_socket(sock, **kwargs)
            if self.ca_certs and self.check_domain:
                try:
                    match_hostname(self.sock.getpeercert(), self.host)
                    logger.debug('Host verified: %s', self.host)
                except CertificateError:  # pragma: no cover
                    self.sock.shutdown(socket.SHUT_RDWR)
                    self.sock.close()
                    raise


问题


面经


文章

微信
公众号

扫码关注公众号