python类Error()的实例源码

ssl_compat.py 文件源码 项目:xxNet 作者: drzorm 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _safe_ssl_call(self, suppress_ragged_eofs, call, *args, **kwargs):
        """Wrap the given call with SSL error-trapping."""
        start = time.time()
        while True:
            try:
                return call(*args, **kwargs)
            except (ossl.WantReadError, ossl.WantWriteError):
                # Sleep and try again. This is dangerous, because it means
                # the rest of the stack has no way of differentiating
                # between a "new handshake" error and "client dropped".
                # Note this isn't an endless loop: there's a timeout below.
                time.sleep(self.SSL_RETRY)
            except ossl.Error as e:
                if suppress_ragged_eofs and e.args == (-1, 'Unexpected EOF'):
                    return b''
                raise socket.error(e.args[0])

            if time.time() - start > self.SSL_TIMEOUT:
                raise socket.timeout('timed out')
sslold.py 文件源码 项目:xxNet 作者: drzorm 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def send(self, data, flags=0, timeout=timeout_default):
        if timeout is timeout_default:
            timeout = self.timeout
        while True:
            try:
                return self._sock.send(data, flags)
            except SSL.WantWriteError, ex:
                if self.timeout == 0.0:
                    raise timeout(str(ex))
                else:
                    sys.exc_clear()
                    wait_write(self.fileno(), timeout=timeout)
            except SSL.WantReadError, ex:
                if self.timeout == 0.0:
                    raise timeout(str(ex))
                else:
                    sys.exc_clear()
                    wait_read(self.fileno(), timeout=timeout)
            except SSL.SysCallError, ex:
                if ex[0] == -1 and data == "":
                    # errors when writing empty strings are expected and can be ignored
                    return 0
                raise sslerror(SysCallError_code_mapping.get(ex.args[0], ex.args[0]), ex.args[1])
            except SSL.Error, ex:
                raise sslerror(str(ex))
sslold.py 文件源码 项目:xxNet 作者: drzorm 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def recv(self, buflen):
        pending = self._sock.pending()
        if pending:
            return self._sock.recv(min(pending, buflen))
        while True:
            try:
                return self._sock.recv(buflen)
            except SSL.WantReadError, ex:
                if self.timeout == 0.0:
                    raise timeout(str(ex))
                else:
                    sys.exc_clear()
                    wait_read(self.fileno(), timeout=self.timeout)
            except SSL.WantWriteError, ex:
                if self.timeout == 0.0:
                    raise timeout(str(ex))
                else:
                    sys.exc_clear()
                    wait_read(self.fileno(), timeout=self.timeout)
            except SSL.ZeroReturnError:
                return ''
            except SSL.SysCallError, ex:
                raise sslerror(SysCallError_code_mapping.get(ex.args[0], ex.args[0]), ex.args[1])
            except SSL.Error, ex:
                raise sslerror(str(ex))
serving.py 文件源码 项目:QualquerMerdaAPI 作者: tiagovizoto 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def log_request(self, code='-', size='-'):
        msg = self.requestline
        code = str(code)

        if termcolor:
            color = termcolor.colored

            if code[0] == '1':    # 1xx - Informational
                msg = color(msg, attrs=['bold'])
            if code[0] == '2':    # 2xx - Success
                msg = color(msg, color='white')
            elif code == '304':   # 304 - Resource Not Modified
                msg = color(msg, color='cyan')
            elif code[0] == '3':  # 3xx - Redirection
                msg = color(msg, color='green')
            elif code == '404':   # 404 - Resource Not Found
                msg = color(msg, color='yellow')
            elif code[0] == '4':  # 4xx - Client Error
                msg = color(msg, color='red', attrs=['bold'])
            else:                 # 5xx, or any other response
                msg = color(msg, color='magenta', attrs=['bold'])

        self.log('info', '"%s" %s %s', msg, code, size)
serving.py 文件源码 项目:gardenbot 作者: GoestaO 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def log_request(self, code='-', size='-'):
        msg = self.requestline
        code = str(code)

        if termcolor:
            color = termcolor.colored

            if code[0] == '1':    # 1xx - Informational
                msg = color(msg, attrs=['bold'])
            if code[0] == '2':    # 2xx - Success
                msg = color(msg, color='white')
            elif code == '304':   # 304 - Resource Not Modified
                msg = color(msg, color='cyan')
            elif code[0] == '3':  # 3xx - Redirection
                msg = color(msg, color='green')
            elif code == '404':   # 404 - Resource Not Found
                msg = color(msg, color='yellow')
            elif code[0] == '4':  # 4xx - Client Error
                msg = color(msg, color='red', attrs=['bold'])
            else:                 # 5xx, or any other response
                msg = color(msg, color='magenta', attrs=['bold'])

        self.log('info', '"%s" %s %s', msg, code, size)
test_https.py 文件源码 项目:slack_scholar 作者: xLeitix 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test03_ssl_verification_of_peer_fails(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        def verify_callback(conn, x509, errnum, errdepth, preverify_ok): 
            log.debug('SSL peer certificate verification failed for %r',
                      x509.get_subject())
            return preverify_ok 

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set bad location - unit test dir has no CA certs to verify with
        ctx.load_verify_locations(None, Constants.UNITTEST_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()        
        self.assertRaises(SSL.Error, conn.request, 'GET', '/')
test_ssl.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _cbLostConns(self, results):
        (sSuccess, sResult), (cSuccess, cResult) = results

        self.assertFalse(sSuccess)
        self.assertFalse(cSuccess)

        acceptableErrors = [SSL.Error]

        # Rather than getting a verification failure on Windows, we are getting
        # a connection failure.  Without something like sslverify proxying
        # in-between we can't fix up the platform's errors, so let's just
        # specifically say it is only OK in this one case to keep the tests
        # passing.  Normally we'd like to be as strict as possible here, so
        # we're not going to allow this to report errors incorrectly on any
        # other platforms.

        if platform.isWindows():
            from twisted.internet.error import ConnectionLost
            acceptableErrors.append(ConnectionLost)

        sResult.trap(*acceptableErrors)
        cResult.trap(*acceptableErrors)

        return self.serverPort.stopListening()
test_sslverify.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_validHostnameInvalidCertificate(self):
        """
        When an invalid certificate containing a perfectly valid hostname is
        received, the connection is aborted with an OpenSSL error.
        """
        cProto, sProto, pump = self.serviceIdentitySetup(
            u"valid.example.com",
            u"valid.example.com",
            validCertificate=False,
        )

        self.assertEqual(cProto.wrappedProtocol.data, b'')
        self.assertEqual(sProto.wrappedProtocol.data, b'')

        cErr = cProto.wrappedProtocol.lostReason.value
        sErr = sProto.wrappedProtocol.lostReason.value

        self.assertIsInstance(cErr, SSL.Error)
        self.assertIsInstance(sErr, SSL.Error)
test_sslverify.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_realCAsBetterNotSignOurBogusTestCerts(self):
        """
        If we use the default trust from the platform, our dinky certificate
        should I{really} fail.
        """
        cProto, sProto, pump = self.serviceIdentitySetup(
            u"valid.example.com",
            u"valid.example.com",
            validCertificate=False,
            useDefaultTrust=True,
        )

        self.assertEqual(cProto.wrappedProtocol.data, b'')
        self.assertEqual(sProto.wrappedProtocol.data, b'')

        cErr = cProto.wrappedProtocol.lostReason.value
        sErr = sProto.wrappedProtocol.lostReason.value

        self.assertIsInstance(cErr, SSL.Error)
        self.assertIsInstance(sErr, SSL.Error)
test_sslverify.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_clientPresentsBadCertificate(self):
        """
        When the server verifies and the client presents an invalid certificate
        for that verification by passing it to
        L{sslverify.optionsForClientTLS}, the connection cannot be established
        with an SSL error.
        """
        cProto, sProto, pump = self.serviceIdentitySetup(
            u"valid.example.com",
            u"valid.example.com",
            validCertificate=True,
            serverVerifies=True,
            validClientCertificate=False,
            clientPresentsCertificate=True,
        )

        self.assertEqual(cProto.wrappedProtocol.data,
                         b'')

        cErr = cProto.wrappedProtocol.lostReason.value
        sErr = sProto.wrappedProtocol.lostReason.value

        self.assertIsInstance(cErr, SSL.Error)
        self.assertIsInstance(sErr, SSL.Error)
test_sslverify.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_surpriseFromInfoCallback(self):
        """
        pyOpenSSL isn't always so great about reporting errors.  If one occurs
        in the verification info callback, it should be logged and the
        connection should be shut down (if possible, anyway; the app_data could
        be clobbered but there's no point testing for that).
        """
        cProto, sProto, pump = self.serviceIdentitySetup(
            u"correct-host.example.com",
            u"correct-host.example.com",
            buggyInfoCallback=True,
        )
        self.assertEqual(cProto.wrappedProtocol.data, b'')
        self.assertEqual(sProto.wrappedProtocol.data, b'')

        cErr = cProto.wrappedProtocol.lostReason.value
        sErr = sProto.wrappedProtocol.lostReason.value

        self.assertIsInstance(cErr, ZeroDivisionError)
        self.assertIsInstance(sErr, (ConnectionClosed, SSL.Error))
        errors = self.flushLoggedErrors(ZeroDivisionError)
        self.assertTrue(errors)
tls.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _checkHandshakeStatus(self):
        """
        Ask OpenSSL to proceed with a handshake in progress.

        Initially, this just sends the ClientHello; after some bytes have been
        stuffed in to the C{Connection} object by C{dataReceived}, it will then
        respond to any C{Certificate} or C{KeyExchange} messages.
        """
        # The connection might already be aborted (eg. by a callback during
        # connection setup), so don't even bother trying to handshake in that
        # case.
        if self._aborted:
            return
        try:
            self._tlsConnection.do_handshake()
        except WantReadError:
            self._flushSendBIO()
        except Error:
            self._tlsShutdownFinished(Failure())
        else:
            self._handshakeDone = True
            if IHandshakeListener.providedBy(self.wrappedProtocol):
                self.wrappedProtocol.handshakeCompleted()
tls.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _shutdownTLS(self):
        """
        Initiate, or reply to, the shutdown handshake of the TLS layer.
        """
        try:
            shutdownSuccess = self._tlsConnection.shutdown()
        except Error:
            # Mid-handshake, a call to shutdown() can result in a
            # WantWantReadError, or rather an SSL_ERR_WANT_READ; but pyOpenSSL
            # doesn't allow us to get at the error.  See:
            # https://github.com/pyca/pyopenssl/issues/91
            shutdownSuccess = False
        self._flushSendBIO()
        if shutdownSuccess:
            # Both sides have shutdown, so we can start closing lower-level
            # transport. This will also happen if we haven't started
            # negotiation at all yet, in which case shutdown succeeds
            # immediately.
            self.transport.loseConnection()
serving.py 文件源码 项目:ieee-cs-txst 作者: codestar12 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def log_request(self, code='-', size='-'):
        msg = self.requestline
        code = str(code)

        if termcolor:
            color = termcolor.colored

            if code[0] == '1':    # 1xx - Informational
                msg = color(msg, attrs=['bold'])
            if code[0] == '2':    # 2xx - Success
                msg = color(msg, color='white')
            elif code == '304':   # 304 - Resource Not Modified
                msg = color(msg, color='cyan')
            elif code[0] == '3':  # 3xx - Redirection
                msg = color(msg, color='green')
            elif code == '404':   # 404 - Resource Not Found
                msg = color(msg, color='yellow')
            elif code[0] == '4':  # 4xx - Client Error
                msg = color(msg, color='red', attrs=['bold'])
            else:                 # 5xx, or any other response
                msg = color(msg, color='magenta', attrs=['bold'])

        self.log('info', '"%s" %s %s', msg, code, size)
ssl_compat.py 文件源码 项目:Docker-XX-Net 作者: kuanghy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _safe_ssl_call(self, suppress_ragged_eofs, call, *args, **kwargs):
        """Wrap the given call with SSL error-trapping."""
        start = time.time()
        while True:
            try:
                return call(*args, **kwargs)
            except (ossl.WantReadError, ossl.WantWriteError):
                # Sleep and try again. This is dangerous, because it means
                # the rest of the stack has no way of differentiating
                # between a "new handshake" error and "client dropped".
                # Note this isn't an endless loop: there's a timeout below.
                time.sleep(self.SSL_RETRY)
            except ossl.Error as e:
                if suppress_ragged_eofs and e.args == (-1, 'Unexpected EOF'):
                    return b''
                raise socket.error(e.args[0])

            if time.time() - start > self.SSL_TIMEOUT:
                raise socket.timeout('timed out')
sslold.py 文件源码 项目:Docker-XX-Net 作者: kuanghy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def send(self, data, flags=0, timeout=timeout_default):
        if timeout is timeout_default:
            timeout = self.timeout
        while True:
            try:
                return self._sock.send(data, flags)
            except SSL.WantWriteError, ex:
                if self.timeout == 0.0:
                    raise timeout(str(ex))
                else:
                    sys.exc_clear()
                    wait_write(self.fileno(), timeout=timeout)
            except SSL.WantReadError, ex:
                if self.timeout == 0.0:
                    raise timeout(str(ex))
                else:
                    sys.exc_clear()
                    wait_read(self.fileno(), timeout=timeout)
            except SSL.SysCallError, ex:
                if ex[0] == -1 and data == "":
                    # errors when writing empty strings are expected and can be ignored
                    return 0
                raise sslerror(SysCallError_code_mapping.get(ex.args[0], ex.args[0]), ex.args[1])
            except SSL.Error, ex:
                raise sslerror(str(ex))
sslold.py 文件源码 项目:Docker-XX-Net 作者: kuanghy 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def recv(self, buflen):
        pending = self._sock.pending()
        if pending:
            return self._sock.recv(min(pending, buflen))
        while True:
            try:
                return self._sock.recv(buflen)
            except SSL.WantReadError, ex:
                if self.timeout == 0.0:
                    raise timeout(str(ex))
                else:
                    sys.exc_clear()
                    wait_read(self.fileno(), timeout=self.timeout)
            except SSL.WantWriteError, ex:
                if self.timeout == 0.0:
                    raise timeout(str(ex))
                else:
                    sys.exc_clear()
                    wait_read(self.fileno(), timeout=self.timeout)
            except SSL.ZeroReturnError:
                return ''
            except SSL.SysCallError, ex:
                raise sslerror(SysCallError_code_mapping.get(ex.args[0], ex.args[0]), ex.args[1])
            except SSL.Error, ex:
                raise sslerror(str(ex))
serving.py 文件源码 项目:WhatTheHack 作者: Sylphias 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def log_request(self, code='-', size='-'):
        msg = self.requestline
        code = str(code)

        if termcolor:
            color = termcolor.colored

            if code[0] == '1':    # 1xx - Informational
                msg = color(msg, attrs=['bold'])
            if code[0] == '2':    # 2xx - Success
                msg = color(msg, color='white')
            elif code == '304':   # 304 - Resource Not Modified
                msg = color(msg, color='cyan')
            elif code[0] == '3':  # 3xx - Redirection
                msg = color(msg, color='green')
            elif code == '404':   # 404 - Resource Not Found
                msg = color(msg, color='yellow')
            elif code[0] == '4':  # 4xx - Client Error
                msg = color(msg, color='red', attrs=['bold'])
            else:                 # 5xx, or any other response
                msg = color(msg, color='magenta', attrs=['bold'])

        self.log('info', '"%s" %s %s', msg, code, size)
serving.py 文件源码 项目:WhatTheHack 作者: Sylphias 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def log_request(self, code='-', size='-'):
        msg = self.requestline
        code = str(code)

        if termcolor:
            color = termcolor.colored

            if code[0] == '1':    # 1xx - Informational
                msg = color(msg, attrs=['bold'])
            if code[0] == '2':    # 2xx - Success
                msg = color(msg, color='white')
            elif code == '304':   # 304 - Resource Not Modified
                msg = color(msg, color='cyan')
            elif code[0] == '3':  # 3xx - Redirection
                msg = color(msg, color='green')
            elif code == '404':   # 404 - Resource Not Found
                msg = color(msg, color='yellow')
            elif code[0] == '4':  # 4xx - Client Error
                msg = color(msg, color='red', attrs=['bold'])
            else:                 # 5xx, or any other response
                msg = color(msg, color='magenta', attrs=['bold'])

        self.log('info', '"%s" %s %s', msg, code, size)
serving.py 文件源码 项目:flask 作者: bobohope 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def log_request(self, code='-', size='-'):
        msg = self.requestline
        code = str(code)

        if termcolor:
            color = termcolor.colored

            if code[0] == '1':    # 1xx - Informational
                msg = color(msg, attrs=['bold'])
            if code[0] == '2':    # 2xx - Success
                msg = color(msg, color='white')
            elif code == '304':   # 304 - Resource Not Modified
                msg = color(msg, color='cyan')
            elif code[0] == '3':  # 3xx - Redirection
                msg = color(msg, color='green')
            elif code == '404':   # 404 - Resource Not Found
                msg = color(msg, color='yellow')
            elif code[0] == '4':  # 4xx - Client Error
                msg = color(msg, color='red', attrs=['bold'])
            else:                 # 5xx, or any other response
                msg = color(msg, color='magenta', attrs=['bold'])

        self.log('info', '"%s" %s %s', msg, code, size)


问题


面经


文章

微信
公众号

扫码关注公众号