def writeSomeData(self, data):
try:
return Connection.writeSomeData(self, data)
except SSL.WantWriteError:
return 0
except SSL.WantReadError:
self.writeBlockedOnRead = 1
Connection.stopWriting(self)
Connection.startReading(self)
return 0
except SSL.ZeroReturnError:
return main.CONNECTION_LOST
except SSL.SysCallError, e:
if e[0] == -1 and data == "":
# errors when writing empty strings are expected
# and can be ignored
return 0
else:
return main.CONNECTION_LOST
except SSL.Error, e:
return e
python类Error()的实例源码
def test_doesNotSwallowOtherSSLErrors(self):
"""
Only no cipher matches get swallowed, every other SSL error gets
propagated.
"""
def raiser(_):
# Unfortunately, there seems to be no way to trigger a real SSL
# error artificially.
raise SSL.Error([['', '', '']])
ctx = FakeContext(SSL.SSLv23_METHOD)
ctx.set_cipher_list = raiser
self.patch(sslverify.SSL, 'Context', lambda _: ctx)
self.assertRaises(
SSL.Error,
sslverify._expandCipherString, u'ALL', SSL.SSLv23_METHOD, 0
)
def __init__(self, host, port, verify, cert_path, pkey_path, pkey_passphrase=''):
self.host = host
self.port = port
try:
self.pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, open(pkey_path, 'rb').read(), pkey_passphrase)
except IOError:
raise eStreamerKeyError("Unable to locate key file {}".format(pkey_path))
except crypto.Error:
raise eStreamerKeyError("Invalid key file or bad passphrase {}".format(pkey_path))
try:
self.cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_path, 'rb').read())
except IOError:
raise eStreamerCertError("Unable to locate cert file {}".format(cert_path))
except crypto.Error:
raise eStreamerCertError("Invalid certificate {}".format(cert_path))
self.verify = verify
self.ctx = None
self.sock = None
self._bytes = None
def test03_ssl_verification_of_peer_fails(self):
ctx = SSL.Context(SSL.TLSv1_METHOD)
def verify_callback(conn, x509, errnum, errdepth, preverify_ok):
log.debug('SSL peer certificate verification failed for %r',
x509.get_subject())
return preverify_ok
ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
ctx.set_verify_depth(9)
# Set bad location - unit test dir has no CA certs to verify with
ctx.load_verify_locations(None, Constants.UNITTEST_DIR)
conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
ssl_context=ctx)
conn.connect()
self.assertRaises(SSL.Error, conn.request, 'GET', '/')
def doRead(self):
if self.writeBlockedOnRead:
self.writeBlockedOnRead = 0
self._resetReadWrite()
try:
return Connection.doRead(self)
except SSL.ZeroReturnError:
return main.CONNECTION_DONE
except SSL.WantReadError:
return
except SSL.WantWriteError:
self.readBlockedOnWrite = 1
Connection.startWriting(self)
Connection.stopReading(self)
return
except SSL.SysCallError, (retval, desc):
if ((retval == -1 and desc == 'Unexpected EOF')
or retval > 0):
return main.CONNECTION_LOST
log.err()
return main.CONNECTION_LOST
except SSL.Error, e:
return e
def _cbLostConns(self, results):
(sSuccess, sResult), (cSuccess, cResult) = results
self.failIf(sSuccess)
self.failIf(cSuccess)
acceptableErrors = [SSL.Error]
# Rather than getting a verification failure on Windows, we are getting
# a connection failure. Without something like sslverify proxying
# in-between we can't fix up the platform's errors, so let's just
# specifically say it is only OK in this one case to keep the tests
# passing. Normally we'd like to be as strict as possible here, so
# we're not going to allow this to report errors incorrectly on any
# other platforms.
if platform.isWindows():
from twisted.internet.error import ConnectionLost
acceptableErrors.append(ConnectionLost)
sResult.trap(*acceptableErrors)
cResult.trap(*acceptableErrors)
return self.serverPort.stopListening()
def testFailedCertificateVerification(self):
onServerLost = defer.Deferred()
onClientLost = defer.Deferred()
self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey, certificate=self.sCert, verify=False, requireCertificate=False),
sslverify.OpenSSLCertificateOptions(verify=True, requireCertificate=False, caCerts=[self.cCert]),
onServerLost=onServerLost,
onClientLost=onClientLost)
d = defer.DeferredList([onClientLost, onServerLost], consumeErrors=True)
def afterLost(((cSuccess, cResult), (sSuccess, sResult))):
self.failIf(cSuccess)
self.failIf(sSuccess)
# Twisted trunk will do the correct thing here, and not log any
# errors. Twisted 2.1 will do the wrong thing. We're flushing
# errors until the buildbot is updated to a reasonable facsimilie
# of 2.2.
log.flushErrors(SSL.Error)
def log_request(self, code='-', size='-'):
msg = self.requestline
code = str(code)
if termcolor:
color = termcolor.colored
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
self.log('info', '"%s" %s %s', msg, code, size)
def log_request(self, code='-', size='-'):
msg = self.requestline
code = str(code)
if termcolor:
color = termcolor.colored
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
self.log('info', '"%s" %s %s', msg, code, size)
LuckyThirteen_vulnerability_tester_plugin.py 文件源码
项目:midip-sslyze
作者: soukupa5
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def _test_dtls_ciphersuite(self, server_connectivity_info, dtls_version, cipher, port):
"""This function is used by threads to it investigates with support the cipher suite on server, when DTLS protocol(s) is/are tested. Returns instance of class AcceptCipher or RejectCipher.
Args:
server_connectivity_info (ServerConnectivityInfo): contains information for connection on server
dtls_version (str): contains SSL/TLS protocol version, which is used to connect
cipher (str): contains OpenSSL shortcut for identification cipher suite
port (int): contains port number for connecting comunication.
"""
cnx = SSL.Context(dtls_version)
cnx.set_cipher_list(cipher)
conn = SSL.Connection(cnx, socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
try:
conn.connect((server_connectivity_info.ip_address, port))
conn.do_handshake()
except SSL.Error as e:
error_msg = ((e[0])[0])[2]
cipher_result = RejectCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher], error_msg)
else:
cipher_result = AcceptCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher])
finally:
conn.shutdown()
conn.close()
return cipher_result
def log_request(self, code='-', size='-'):
msg = self.requestline
code = str(code)
if termcolor:
color = termcolor.colored
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
self.log('info', '"%s" %s %s', msg, code, size)
def _safe_ssl_call(self, suppress_ragged_eofs, call, *args, **kwargs):
"""Wrap the given call with SSL error-trapping."""
start = time.time()
while True:
try:
return call(*args, **kwargs)
except (ossl.WantReadError, ossl.WantWriteError):
# Sleep and try again. This is dangerous, because it means
# the rest of the stack has no way of differentiating
# between a "new handshake" error and "client dropped".
# Note this isn't an endless loop: there's a timeout below.
time.sleep(self.SSL_RETRY)
except ossl.Error as e:
if suppress_ragged_eofs and e.args == (-1, 'Unexpected EOF'):
return b''
raise socket.error(e.args[0])
if time.time() - start > self.SSL_TIMEOUT:
raise socket.timeout('timed out')
def send(self, data, flags=0, timeout=timeout_default):
if timeout is timeout_default:
timeout = self.timeout
while True:
try:
return self._sock.send(data, flags)
except SSL.WantWriteError, ex:
if self.timeout == 0.0:
raise timeout(str(ex))
else:
sys.exc_clear()
wait_write(self.fileno(), timeout=timeout)
except SSL.WantReadError, ex:
if self.timeout == 0.0:
raise timeout(str(ex))
else:
sys.exc_clear()
wait_read(self.fileno(), timeout=timeout)
except SSL.SysCallError, ex:
if ex[0] == -1 and data == "":
# errors when writing empty strings are expected and can be ignored
return 0
raise sslerror(SysCallError_code_mapping.get(ex.args[0], ex.args[0]), ex.args[1])
except SSL.Error, ex:
raise sslerror(str(ex))
def recv(self, buflen):
pending = self._sock.pending()
if pending:
return self._sock.recv(min(pending, buflen))
while True:
try:
return self._sock.recv(buflen)
except SSL.WantReadError, ex:
if self.timeout == 0.0:
raise timeout(str(ex))
else:
sys.exc_clear()
wait_read(self.fileno(), timeout=self.timeout)
except SSL.WantWriteError, ex:
if self.timeout == 0.0:
raise timeout(str(ex))
else:
sys.exc_clear()
wait_read(self.fileno(), timeout=self.timeout)
except SSL.ZeroReturnError:
return ''
except SSL.SysCallError, ex:
raise sslerror(SysCallError_code_mapping.get(ex.args[0], ex.args[0]), ex.args[1])
except SSL.Error, ex:
raise sslerror(str(ex))
def _safe_ssl_call(self, suppress_ragged_eofs, call, *args, **kwargs):
"""Wrap the given call with SSL error-trapping."""
start = time.time()
while True:
try:
return call(*args, **kwargs)
except (ossl.WantReadError, ossl.WantWriteError):
# Sleep and try again. This is dangerous, because it means
# the rest of the stack has no way of differentiating
# between a "new handshake" error and "client dropped".
# Note this isn't an endless loop: there's a timeout below.
time.sleep(self.SSL_RETRY)
except ossl.Error as e:
if suppress_ragged_eofs and e.args == (-1, 'Unexpected EOF'):
return b''
raise socket.error(e.args[0])
if time.time() - start > self.SSL_TIMEOUT:
raise socket.timeout('timed out')
def send(self, data, flags=0, timeout=timeout_default):
if timeout is timeout_default:
timeout = self.timeout
while True:
try:
return self._sock.send(data, flags)
except SSL.WantWriteError, ex:
if self.timeout == 0.0:
raise timeout(str(ex))
else:
sys.exc_clear()
wait_write(self.fileno(), timeout=timeout)
except SSL.WantReadError, ex:
if self.timeout == 0.0:
raise timeout(str(ex))
else:
sys.exc_clear()
wait_read(self.fileno(), timeout=timeout)
except SSL.SysCallError, ex:
if ex[0] == -1 and data == "":
# errors when writing empty strings are expected and can be ignored
return 0
raise sslerror(SysCallError_code_mapping.get(ex.args[0], ex.args[0]), ex.args[1])
except SSL.Error, ex:
raise sslerror(str(ex))
def recv(self, buflen):
pending = self._sock.pending()
if pending:
return self._sock.recv(min(pending, buflen))
while True:
try:
return self._sock.recv(buflen)
except SSL.WantReadError, ex:
if self.timeout == 0.0:
raise timeout(str(ex))
else:
sys.exc_clear()
wait_read(self.fileno(), timeout=self.timeout)
except SSL.WantWriteError, ex:
if self.timeout == 0.0:
raise timeout(str(ex))
else:
sys.exc_clear()
wait_read(self.fileno(), timeout=self.timeout)
except SSL.ZeroReturnError:
return ''
except SSL.SysCallError, ex:
raise sslerror(SysCallError_code_mapping.get(ex.args[0], ex.args[0]), ex.args[1])
except SSL.Error, ex:
raise sslerror(str(ex))
def log_request(self, code='-', size='-'):
msg = self.requestline
code = str(code)
if termcolor:
color = termcolor.colored
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
self.log('info', '"%s" %s %s', msg, code, size)
def log_request(self, code='-', size='-'):
msg = self.requestline
code = str(code)
if termcolor:
color = termcolor.colored
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
self.log('info', '"%s" %s %s', msg, code, size)
def log_request(self, code='-', size='-'):
msg = self.requestline
code = str(code)
if termcolor:
color = termcolor.colored
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
self.log('info', '"%s" %s %s', msg, code, size)
def log_request(self, code='-', size='-'):
msg = self.requestline
code = str(code)
if termcolor:
color = termcolor.colored
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
self.log('info', '"%s" %s %s', msg, code, size)
def log_request(self, code='-', size='-'):
msg = self.requestline
code = str(code)
if termcolor:
color = termcolor.colored
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
self.log('info', '"%s" %s %s', msg, code, size)
def doRead(self):
if self.writeBlockedOnRead:
self.writeBlockedOnRead = 0
self._resetReadWrite()
try:
return Connection.doRead(self)
except SSL.ZeroReturnError:
return main.CONNECTION_DONE
except SSL.WantReadError:
return
except SSL.WantWriteError:
self.readBlockedOnWrite = 1
Connection.startWriting(self)
Connection.stopReading(self)
return
except SSL.SysCallError, (retval, desc):
if ((retval == -1 and desc == 'Unexpected EOF')
or retval > 0):
return main.CONNECTION_LOST
log.err()
return main.CONNECTION_LOST
except SSL.Error, e:
return e
def writeSomeData(self, data):
try:
return Connection.writeSomeData(self, data)
except SSL.WantWriteError:
return 0
except SSL.WantReadError:
self.writeBlockedOnRead = 1
Connection.stopWriting(self)
Connection.startReading(self)
return 0
except SSL.ZeroReturnError:
return main.CONNECTION_LOST
except SSL.SysCallError, e:
if e[0] == -1 and data == "":
# errors when writing empty strings are expected
# and can be ignored
return 0
else:
return main.CONNECTION_LOST
except SSL.Error, e:
return e
def _cbLostConns(self, results):
(sSuccess, sResult), (cSuccess, cResult) = results
self.failIf(sSuccess)
self.failIf(cSuccess)
acceptableErrors = [SSL.Error]
# Rather than getting a verification failure on Windows, we are getting
# a connection failure. Without something like sslverify proxying
# in-between we can't fix up the platform's errors, so let's just
# specifically say it is only OK in this one case to keep the tests
# passing. Normally we'd like to be as strict as possible here, so
# we're not going to allow this to report errors incorrectly on any
# other platforms.
if platform.isWindows():
from twisted.internet.error import ConnectionLost
acceptableErrors.append(ConnectionLost)
sResult.trap(*acceptableErrors)
cResult.trap(*acceptableErrors)
return self.serverPort.stopListening()
def testFailedCertificateVerification(self):
onServerLost = defer.Deferred()
onClientLost = defer.Deferred()
self.loopback(sslverify.OpenSSLCertificateOptions(privateKey=self.sKey, certificate=self.sCert, verify=False, requireCertificate=False),
sslverify.OpenSSLCertificateOptions(verify=True, requireCertificate=False, caCerts=[self.cCert]),
onServerLost=onServerLost,
onClientLost=onClientLost)
d = defer.DeferredList([onClientLost, onServerLost], consumeErrors=True)
def afterLost(((cSuccess, cResult), (sSuccess, sResult))):
self.failIf(cSuccess)
self.failIf(sSuccess)
# Twisted trunk will do the correct thing here, and not log any
# errors. Twisted 2.1 will do the wrong thing. We're flushing
# errors until the buildbot is updated to a reasonable facsimilie
# of 2.2.
log.flushErrors(SSL.Error)
def log_request(self, code='-', size='-'):
msg = self.requestline
code = str(code)
if termcolor:
color = termcolor.colored
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
self.log('info', '"%s" %s %s', msg, code, size)
def log_request(self, code='-', size='-'):
msg = self.requestline
code = str(code)
if termcolor:
color = termcolor.colored
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
self.log('info', '"%s" %s %s', msg, code, size)
def do_handshake(self):
self.update_flags()
if not isinstance(self.sock, SSL.Connection):
return
self.handshaking = True
self.ssl_write = None
try:
self.sock.do_handshake()
except SSL.WantWriteError:
self.ssl_write = True
except SSL.WantReadError:
pass
except SSL.Error:
self.close()
else:
self.handshaking = False
def log_request(self, code='-', size='-'):
msg = self.requestline
code = str(code)
if termcolor:
color = termcolor.colored
if code[0] == '1': # 1xx - Informational
msg = color(msg, attrs=['bold'])
if code[0] == '2': # 2xx - Success
msg = color(msg, color='white')
elif code == '304': # 304 - Resource Not Modified
msg = color(msg, color='cyan')
elif code[0] == '3': # 3xx - Redirection
msg = color(msg, color='green')
elif code == '404': # 404 - Resource Not Found
msg = color(msg, color='yellow')
elif code[0] == '4': # 4xx - Client Error
msg = color(msg, color='red', attrs=['bold'])
else: # 5xx, or any other response
msg = color(msg, color='magenta', attrs=['bold'])
self.log('info', '"%s" %s %s', msg, code, size)