def __init__(self, privateKeyFileName, certificateFileName,
sslmethod=SSL.SSLv23_METHOD, _contextFactory=SSL.Context):
"""
@param privateKeyFileName: Name of a file containing a private key
@param certificateFileName: Name of a file containing a certificate
@param sslmethod: The SSL method to use
"""
self.privateKeyFileName = privateKeyFileName
self.certificateFileName = certificateFileName
self.sslmethod = sslmethod
self._contextFactory = _contextFactory
# Create a context object right now. This is to force validation of
# the given parameters so that errors are detected earlier rather
# than later.
self.cacheContext()
python类Context()的实例源码
def __init__(self, server_address, HandlerClass, logRequests=True):
"""Secure XML-RPC server.
It it very similar to SimpleXMLRPCServer but it uses HTTPS for transporting XML data.
"""
self.logRequests = logRequests
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
SocketServer.BaseServer.__init__(self, server_address, HandlerClass)
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.use_privatekey_file (KEYFILE)
ctx.use_certificate_file(CERTFILE)
self.socket = SSL.Connection(ctx, socket.socket(self.address_family,
self.socket_type))
self.server_bind()
self.server_activate()
def test03_ssl_verification_of_peer_fails(self):
ctx = SSL.Context(SSL.TLSv1_METHOD)
def verify_callback(conn, x509, errnum, errdepth, preverify_ok):
log.debug('SSL peer certificate verification failed for %r',
x509.get_subject())
return preverify_ok
ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
ctx.set_verify_depth(9)
# Set bad location - unit test dir has no CA certs to verify with
ctx.load_verify_locations(None, Constants.UNITTEST_DIR)
conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
ssl_context=ctx)
conn.connect()
self.assertRaises(SSL.Error, conn.request, 'GET', '/')
def test04_ssl_verification_with_subj_alt_name(self):
ctx = SSL.Context(SSL.TLSv1_METHOD)
verification = ServerSSLCertVerification(hostname='localhost')
verify_callback = verification.get_verify_server_cert_func()
ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
ctx.set_verify_depth(9)
# Set correct location for CA certs to verify with
ctx.load_verify_locations(None, Constants.CACERT_DIR)
conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
ssl_context=ctx)
conn.connect()
conn.request('GET', '/')
resp = conn.getresponse()
print('Response = %s' % resp.read())
def test04_ssl_verification_with_subj_common_name(self):
ctx = SSL.Context(SSL.TLSv1_METHOD)
# Explicitly set verification of peer hostname using peer certificate
# subject common name
verification = ServerSSLCertVerification(hostname='localhost',
subj_alt_name_match=False)
verify_callback = verification.get_verify_server_cert_func()
ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
ctx.set_verify_depth(9)
# Set correct location for CA certs to verify with
ctx.load_verify_locations(None, Constants.CACERT_DIR)
conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
ssl_context=ctx)
conn.connect()
conn.request('GET', '/')
resp = conn.getresponse()
print('Response = %s' % resp.read())
def connect(self):
"""Create SSL socket and connect to peer
"""
if getattr(self, 'ssl_context', None):
if not isinstance(self.ssl_context, SSL.Context):
raise TypeError('Expecting OpenSSL.SSL.Context type for "'
'ssl_context" attribute; got %r instead' %
self.ssl_context)
ssl_context = self.ssl_context
else:
ssl_context = SSL.Context(self.__class__.default_ssl_method)
sock = socket.create_connection((self.host, self.port), self.timeout)
# Tunnel if using a proxy - ONLY available for Python 2.6.2 and above
if getattr(self, '_tunnel_host', None):
self.sock = sock
self._tunnel()
self.sock = SSLSocket(ssl_context, sock)
# Go to client mode.
self.sock.set_connect_state()
LuckyThirteen_vulnerability_tester_plugin.py 文件源码
项目:midip-sslyze
作者: soukupa5
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def _test_dtls_ciphersuite(self, server_connectivity_info, dtls_version, cipher, port):
"""This function is used by threads to it investigates with support the cipher suite on server, when DTLS protocol(s) is/are tested. Returns instance of class AcceptCipher or RejectCipher.
Args:
server_connectivity_info (ServerConnectivityInfo): contains information for connection on server
dtls_version (str): contains SSL/TLS protocol version, which is used to connect
cipher (str): contains OpenSSL shortcut for identification cipher suite
port (int): contains port number for connecting comunication.
"""
cnx = SSL.Context(dtls_version)
cnx.set_cipher_list(cipher)
conn = SSL.Connection(cnx, socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
try:
conn.connect((server_connectivity_info.ip_address, port))
conn.do_handshake()
except SSL.Error as e:
error_msg = ((e[0])[0])[2]
cipher_result = RejectCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher], error_msg)
else:
cipher_result = AcceptCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher])
finally:
conn.shutdown()
conn.close()
return cipher_result
LuckyThirteen_vulnerability_tester_plugin.py 文件源码
项目:midip-sslyze
作者: soukupa5
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_dtls_protocol_support(self, server_connectivity_info, dtls_version, port):
"""Tests if DTLS protocols are supported by server. Returns true if server supports protocol otherwise returns false.
Args:
server_connectivity_info (ServerConnectivityInfo): contains information for connection on server
dtls_protocol (str): contains version of DTLS protocol, which is supposed to be tested
port (int): contains port number for connecting comunication.
"""
cnx = SSL.Context(dtls_version)
cnx.set_cipher_list('ALL:COMPLEMENTOFALL')
conn = SSL.Connection(cnx,socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
try:
conn.connect((server_connectivity_info.ip_address, port))
conn.do_handshake()
except SSL.SysCallError as ex:
if ex[0] == 111:
raise ValueError('LuckyThirteenVulnerabilityTesterPlugin: It is entered wrong port for DTLS connection.')
else:
support = False
else:
support = True
finally:
conn.shutdown()
conn.close()
return support
def ssl(sock, keyfile=None, certfile=None):
context = SSL.Context(SSL.SSLv23_METHOD)
if certfile is not None:
context.use_certificate_file(certfile)
if keyfile is not None:
context.use_privatekey_file(keyfile)
context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
timeout = sock.gettimeout()
try:
sock = sock._sock
except AttributeError:
pass
connection = SSL.Connection(context, sock)
ssl_sock = SSLObject(connection)
ssl_sock.settimeout(timeout)
try:
sock.getpeername()
except Exception:
# no, no connection yet
pass
else:
# yes, do the handshake
ssl_sock.do_handshake()
return ssl_sock
def ssl(sock, keyfile=None, certfile=None):
context = SSL.Context(SSL.SSLv23_METHOD)
if certfile is not None:
context.use_certificate_file(certfile)
if keyfile is not None:
context.use_privatekey_file(keyfile)
context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
timeout = sock.gettimeout()
try:
sock = sock._sock
except AttributeError:
pass
connection = SSL.Connection(context, sock)
ssl_sock = SSLObject(connection)
ssl_sock.settimeout(timeout)
try:
sock.getpeername()
except Exception:
# no, no connection yet
pass
else:
# yes, do the handshake
ssl_sock.do_handshake()
return ssl_sock
def main():
"""
Connect to an SNI-enabled server and request a specific hostname, specified
by argv[1], of it.
"""
if len(argv) < 2:
print 'Usage: %s <hostname>' % (argv[0],)
return 1
client = socket()
print 'Connecting...',
stdout.flush()
client.connect(('127.0.0.1', 8443))
print 'connected', client.getpeername()
client_ssl = Connection(Context(TLSv1_METHOD), client)
client_ssl.set_connect_state()
client_ssl.set_tlsext_host_name(argv[1])
client_ssl.do_handshake()
print 'Server subject is', client_ssl.get_peer_certificate().get_subject()
client_ssl.close()
def main():
"""
Run an SNI-enabled server which selects between a few certificates in a
C{dict} based on the handshake request it receives from a client.
"""
port = socket()
port.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
port.bind(('', 8443))
port.listen(3)
print 'Accepting...',
stdout.flush()
server, addr = port.accept()
print 'accepted', addr
server_context = Context(TLSv1_METHOD)
server_context.set_tlsext_servername_callback(pick_certificate)
server_ssl = Connection(server_context, server)
server_ssl.set_accept_state()
server_ssl.do_handshake()
server.close()
def __init__(self, server_address, RequestHandlerClass):
SocketServer.BaseServer.__init__(
self, server_address, RequestHandlerClass
)
# Same as normal, but make it secure:
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.set_options(SSL.OP_NO_SSLv2)
dir = os.curdir
ctx.use_privatekey_file(os.path.join(dir, 'server.pkey'))
ctx.use_certificate_file(os.path.join(dir, 'server.cert'))
self.socket = SSLWrapper(
SSL.Connection(
ctx, socket.socket(self.address_family, self.socket_type)
)
)
self.server_bind()
self.server_activate()
def ssl(sock, keyfile=None, certfile=None):
context = SSL.Context(SSL.SSLv23_METHOD)
if certfile is not None:
context.use_certificate_file(certfile)
if keyfile is not None:
context.use_privatekey_file(keyfile)
context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
timeout = sock.gettimeout()
try:
sock = sock._sock
except AttributeError:
pass
connection = SSL.Connection(context, sock)
ssl_sock = SSLObject(connection)
ssl_sock.settimeout(timeout)
try:
sock.getpeername()
except Exception:
# no, no connection yet
pass
else:
# yes, do the handshake
ssl_sock.do_handshake()
return ssl_sock
def connect(self):
"""Create SSL socket and connect to peer
"""
if getattr(self, 'ssl_context', None):
if not isinstance(self.ssl_context, SSL.Context):
raise TypeError('Expecting OpenSSL.SSL.Context type for "'
'ssl_context" attribute; got %r instead' %
self.ssl_context)
ssl_context = self.ssl_context
else:
ssl_context = SSL.Context(self.__class__.default_ssl_method)
sock = socket.create_connection((self.host, self.port), self.timeout)
# Tunnel if using a proxy - ONLY available for Python 2.6.2 and above
if getattr(self, '_tunnel_host', None):
self.sock = sock
self._tunnel()
self.sock = SSLSocket(ssl_context, sock)
# Go to client mode.
self.sock.set_connect_state()
def __init__(self, ssl_context, debuglevel=0):
"""
@param ssl_context:SSL context
@type ssl_context: OpenSSL.SSL.Context
@param debuglevel: debug level for HTTPSHandler
@type debuglevel: int
"""
AbstractHTTPHandler.__init__(self, debuglevel)
if ssl_context is not None:
if not isinstance(ssl_context, SSL.Context):
raise TypeError('Expecting OpenSSL.SSL.Context type for "'
'ssl_context" keyword; got %r instead' %
ssl_context)
self.ssl_context = ssl_context
else:
self.ssl_context = SSL.Context(SSL.TLSv1_METHOD)
def connect(self):
"""Create SSL socket and connect to peer
"""
if getattr(self, 'ssl_context', None):
if not isinstance(self.ssl_context, SSL.Context):
raise TypeError('Expecting OpenSSL.SSL.Context type for "'
'ssl_context" attribute; got %r instead' %
self.ssl_context)
ssl_context = self.ssl_context
else:
ssl_context = SSL.Context(self.__class__.default_ssl_method)
sock = socket.create_connection((self.host, self.port), self.timeout)
# Tunnel if using a proxy - ONLY available for Python 2.6.2 and above
if getattr(self, '_tunnel_host', None):
self.sock = sock
self._tunnel()
self.sock = SSLSocket(ssl_context, sock)
# Go to client mode.
self.sock.set_connect_state()
def __init__(self, ssl_context, debuglevel=0):
"""
@param ssl_context:SSL context
@type ssl_context: OpenSSL.SSL.Context
@param debuglevel: debug level for HTTPSHandler
@type debuglevel: int
"""
AbstractHTTPHandler.__init__(self, debuglevel)
if ssl_context is not None:
if not isinstance(ssl_context, SSL.Context):
raise TypeError('Expecting OpenSSL.SSL.Context type for "'
'ssl_context" keyword; got %r instead' %
ssl_context)
self.ssl_context = ssl_context
else:
self.ssl_context = SSL.Context(SSL.TLSv1_METHOD)
def bind(self, family, type, proto=0):
"""Create (or recreate) the actual socket object."""
self.socket = socket.socket(family, type, proto)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
## self.socket.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1)
if self.ssl_certificate and self.ssl_private_key:
if SSL is None:
raise ImportError("You must install pyOpenSSL to use HTTPS.")
# See http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.use_privatekey_file(self.ssl_private_key)
ctx.use_certificate_file(self.ssl_certificate)
self.socket = SSLConnection(ctx, self.socket)
self.populate_ssl_environ()
self.socket.bind(self.bind_addr)
def bind(self, family, type, proto=0):
"""Create (or recreate) the actual socket object."""
self.socket = socket.socket(family, type, proto)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
## self.socket.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1)
if self.ssl_certificate and self.ssl_private_key:
if SSL is None:
raise ImportError("You must install pyOpenSSL to use HTTPS.")
# See http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.use_privatekey_file(self.ssl_private_key)
ctx.use_certificate_file(self.ssl_certificate)
self.socket = SSLConnection(ctx, self.socket)
self.populate_ssl_environ()
self.socket.bind(self.bind_addr)
def test03_ssl_verification_of_peer_fails(self):
ctx = SSL.Context(SSL.TLSv1_METHOD)
def verify_callback(conn, x509, errnum, errdepth, preverify_ok):
log.debug('SSL peer certificate verification failed for %r',
x509.get_subject())
return preverify_ok
ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
ctx.set_verify_depth(9)
# Set bad location - unit test dir has no CA certs to verify with
ctx.load_verify_locations(None, Constants.UNITTEST_DIR)
conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
ssl_context=ctx)
conn.connect()
self.assertRaises(SSL.Error, conn.request, 'GET', '/')
def test04_ssl_verification_with_subj_alt_name(self):
ctx = SSL.Context(SSL.TLSv1_METHOD)
verification = ServerSSLCertVerification(hostname='localhost')
verify_callback = verification.get_verify_server_cert_func()
ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
ctx.set_verify_depth(9)
# Set correct location for CA certs to verify with
ctx.load_verify_locations(None, Constants.CACERT_DIR)
conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
ssl_context=ctx)
conn.connect()
conn.request('GET', '/')
resp = conn.getresponse()
print('Response = %s' % resp.read())
def test04_ssl_verification_with_subj_common_name(self):
ctx = SSL.Context(SSL.TLSv1_METHOD)
# Explicitly set verification of peer hostname using peer certificate
# subject common name
verification = ServerSSLCertVerification(hostname='localhost',
subj_alt_name_match=False)
verify_callback = verification.get_verify_server_cert_func()
ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
ctx.set_verify_depth(9)
# Set correct location for CA certs to verify with
ctx.load_verify_locations(None, Constants.CACERT_DIR)
conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
ssl_context=ctx)
conn.connect()
conn.request('GET', '/')
resp = conn.getresponse()
print('Response = %s' % resp.read())
def connect(self):
"""Create SSL socket and connect to peer
"""
if getattr(self, 'ssl_context', None):
if not isinstance(self.ssl_context, SSL.Context):
raise TypeError('Expecting OpenSSL.SSL.Context type for "'
'ssl_context" attribute; got %r instead' %
self.ssl_context)
ssl_context = self.ssl_context
else:
ssl_context = SSL.Context(self.__class__.default_ssl_method)
sock = socket.create_connection((self.host, self.port), self.timeout)
# Tunnel if using a proxy - ONLY available for Python 2.6.2 and above
if getattr(self, '_tunnel_host', None):
self.sock = sock
self._tunnel()
self.sock = SSLSocket(ssl_context, sock)
# Go to client mode.
self.sock.set_connect_state()
def __init__(self, hostname, ctx):
"""
Initialize L{ClientTLSOptions}.
@param hostname: The hostname to verify as input by a human.
@type hostname: L{unicode}
@param ctx: an L{OpenSSL.SSL.Context} to use for new connections.
@type ctx: L{OpenSSL.SSL.Context}.
"""
self._ctx = ctx
self._hostname = hostname
self._hostnameBytes = _idnaBytes(hostname)
self._hostnameASCII = self._hostnameBytes.decode("ascii")
ctx.set_info_callback(
_tolerateErrors(self._identityVerifyingInfoCallback)
)
def _identityVerifyingInfoCallback(self, connection, where, ret):
"""
U{info_callback
<http://pythonhosted.org/pyOpenSSL/api/ssl.html#OpenSSL.SSL.Context.set_info_callback>
} for pyOpenSSL that verifies the hostname in the presented certificate
matches the one passed to this L{ClientTLSOptions}.
@param connection: the connection which is handshaking.
@type connection: L{OpenSSL.SSL.Connection}
@param where: flags indicating progress through a TLS handshake.
@type where: L{int}
@param ret: ignored
@type ret: ignored
"""
if where & SSL.SSL_CB_HANDSHAKE_START:
connection.set_tlsext_host_name(self._hostnameBytes)
elif where & SSL.SSL_CB_HANDSHAKE_DONE:
try:
verifyHostname(connection, self._hostnameASCII)
except VerificationError:
f = Failure()
transport = connection.get_app_data()
transport.failVerification(f)
def test_extraChainFilesAreAddedIfSupplied(self):
"""
If C{extraCertChain} is set and all prerequisites are met, the
specified chain certificates are added to C{Context}s that get
created.
"""
opts = sslverify.OpenSSLCertificateOptions(
privateKey=self.sKey,
certificate=self.sCert,
extraCertChain=self.extraCertChain,
)
opts._contextFactory = FakeContext
ctx = opts.getContext()
self.assertEqual(self.sKey, ctx._privateKey)
self.assertEqual(self.sCert, ctx._certificate)
self.assertEqual(self.extraCertChain, ctx._extraCertChain)
def ssl(sock, keyfile=None, certfile=None):
context = SSL.Context(SSL.SSLv23_METHOD)
if certfile is not None:
context.use_certificate_file(certfile)
if keyfile is not None:
context.use_privatekey_file(keyfile)
context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
timeout = sock.gettimeout()
try:
sock = sock._sock
except AttributeError:
pass
connection = SSL.Connection(context, sock)
ssl_sock = SSLObject(connection)
ssl_sock.settimeout(timeout)
try:
sock.getpeername()
except Exception:
# no, no connection yet
pass
else:
# yes, do the handshake
ssl_sock.do_handshake()
return ssl_sock
def main():
resolver = DNSResolver()
factory = server.DNSServerFactory(
clients=[resolver]
)
protocol = dns.DNSDatagramProtocol(controller=factory)
httpserver = webserver.Site(HTTPServer(resolver))
context = Context(TLSv1_METHOD)
context.use_certificate_chain_file(SERVER_CONFIG["ssl_crt"])
context.use_privatekey_file(SERVER_CONFIG["ssl_key"])
reactor.listenUDP(SERVER_CONFIG["dns_port"], protocol)
reactor.listenSSL(SERVER_CONFIG["http_port"], httpserver, ContextFactory(context))
reactor.run()
def __enter__(self):
self.ctx = SSL.Context(SSL.TLSv1_METHOD)
if self.verify:
self.ctx.set_verify(SSL.VERIFY_PEER, self.validate_cert)
self.ctx.load_verify_locations(self.verify)
self.trusted_cert = crypto.load_certificate(crypto.FILETYPE_PEM, file(self.verify).read())
self.ctx.use_privatekey(self.pkey)
self.ctx.use_certificate(self.cert)
self.sock = SSL.Connection(self.ctx, socket.socket(socket.AF_INET, socket.SOCK_STREAM))
self.sock.connect((self.host, self.port))
return self