def __init__(self, registerInstance, server_address, keyFile=DEFAULTKEYFILE, certFile=DEFAULTCERTFILE, logRequests=True):
"""Secure Documenting XML-RPC server.
It it very similar to DocXMLRPCServer but it uses HTTPS for transporting XML data.
"""
DocXMLRPCServer.__init__(self, server_address, SecureDocXMLRpcRequestHandler, logRequests)
self.logRequests = logRequests
# stuff for doc server
try: self.set_server_title(registerInstance.title)
except AttributeError: self.set_server_title('default title')
try: self.set_server_name(registerInstance.name)
except AttributeError: self.set_server_name('default name')
if registerInstance.__doc__: self.set_server_documentation(registerInstance.__doc__)
else: self.set_server_documentation('default documentation')
self.register_introspection_functions()
# init stuff, handle different versions:
try:
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
except TypeError:
# An exception is raised in Python 2.5 as the prototype of the __init__
# method has changed and now has 3 arguments (self, allow_none, encoding)
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, False, None)
SocketServer.BaseServer.__init__(self, server_address, SecureDocXMLRpcRequestHandler)
self.register_instance(registerInstance) # for some reason, have to register instance down here!
# SSL socket stuff
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.use_privatekey_file(keyFile)
ctx.use_certificate_file(certFile)
self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type))
self.server_bind()
self.server_activate()
# requests count and condition, to allow for keyboard quit via CTL-C
self.requests = 0
self.rCondition = Condition()
python类Context()的实例源码
def __init__(self, server_address, certfile, keyfile, RequestHandlerClass, bind_and_activate=True):
from OpenSSL import SSL
SocketServer.BaseServer.__init__(self, server_address, RequestHandlerClass)
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.use_privatekey_file(keyfile)
ctx.use_certificate_file(certfile)
self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type))
if bind_and_activate:
self.server_bind()
self.server_activate()
def test01_configuration(self):
config = Configuration(SSL.Context(SSL.TLSv1_METHOD), True)
self.assertTrue(config.ssl_context)
self.assertEqual(config.debug, True)
def test02_fetch_from_url(self):
config = Configuration(SSL.Context(SSL.TLSv1_METHOD), True)
res = fetch_from_url(Constants.TEST_URI, config)
self.assertTrue(res)
def test03_open_url(self):
config = Configuration(SSL.Context(SSL.TLSv1_METHOD), True)
res = open_url(Constants.TEST_URI, config)
self.assertEqual(res[0], 200,
'open_url for %r failed' % Constants.TEST_URI)
def test04_open_peer_cert_verification_fails(self):
# Explicitly set empty CA directory to make verification fail
ctx = SSL.Context(SSL.TLSv1_METHOD)
verify_callback = lambda conn, x509, errnum, errdepth, preverify_ok: \
preverify_ok
ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
ctx.load_verify_locations(None, './')
opener = build_opener(ssl_context=ctx)
self.assertRaises(SSL.Error, opener.open, Constants.TEST_URI)
def __init__(self, host, port=None, strict=None,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT, ssl_context=None):
HTTPConnection.__init__(self, host, port, strict, timeout)
if not hasattr(self, 'ssl_context'):
self.ssl_context = None
if ssl_context is not None:
if not isinstance(ssl_context, SSL.Context):
raise TypeError('Expecting OpenSSL.SSL.Context type for "'
'ssl_context" keyword; got %r instead' %
ssl_context)
self.ssl_context = ssl_context
def connect (self, host, port):
if self.state == 1:
print "Already has an active connection"
elif self.type == 0: # TCP
if self.ssl == 1:
ctx = SSL.Context (SSL.SSLv23_METHOD)
s = SSL.Connection (ctx, socket(AF_INET, SOCK_STREAM))
try:
err = s.connect_ex ((host, port))
except:
print "Couldn't connect SSL socket"
return
if err == 0:
self.skt = s
self.state = 1
else:
s = socket (AF_INET, SOCK_STREAM)
try:
err = s.connect_ex ((host, port))
except:
print "Couldn't connect TCP socket"
return
if err == 0:
self.skt = s
self.state = 1
elif self.type == 1: # UDP
s = socket (AF_INET, SOCK_DGRAM)
try:
err = s.connect_ex ((host, port))
except:
print "Couldn't create UDP socket"
return
if err == 0:
self.skt = s
self.state = 1
else:
print "RAW sockets not implemented yet"
if self.state == 1:
return "OK"
def getContext(self):
"""Return a SSL.Context object.
"""
if self._context is None:
self._context = self._makeContext()
return self._context
def getContext(self):
"""Return a SSL.Context object. override in subclasses."""
raise NotImplementedError
def cacheContext(self):
ctx = SSL.Context(self.sslmethod)
ctx.use_certificate_file(self.certificateFileName)
ctx.use_privatekey_file(self.privateKeyFileName)
self._context = ctx
def getContext(self):
return SSL.Context(self.method)
def getContext(self):
ctx = SSL.Context(SSL.TLSv1_METHOD)
ctx.use_certificate_file(self.filename)
ctx.use_privatekey_file(self.filename)
return ctx
def getContext(self):
"""Create an SSL context."""
from OpenSSL import SSL
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.use_certificate_file(self.filename)
ctx.use_privatekey_file(self.filename)
return ctx
def ssl():
from OpenSSL import SSL
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.use_privatekey_file('ssl.key')
ctx.use_certificate_file('ssl.cert')
app.run(ssl_context=ctx)
def run_app():
from subprocess import Popen
import sys
os.chdir(os.path.abspath(os.path.dirname(__file__)))
path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "manage.py")
args = [sys.executable, path, "db"]
if not os.path.exists(os.path.join(config.DATA_DIR, "plexivity.db")):
from app import db
db.create_all()
args.append("stamp")
args.append("head")
else:
args.append("upgrade")
Popen(args)
helper.startScheduler()
if config.USE_SSL:
helper.generateSSLCert()
try:
from OpenSSL import SSL
context = SSL.Context(SSL.SSLv23_METHOD)
context.use_privatekey_file(os.path.join(config.DATA_DIR, "plexivity.key"))
context.use_certificate_file(os.path.join(config.DATA_DIR, "plexivity.crt"))
app.run(host="0.0.0.0", port=config.PORT, debug=False, ssl_context=context)
except:
logger.error("plexivity should use SSL but OpenSSL was not found, starting without SSL")
app.run(host="0.0.0.0", port=config.PORT, debug=False)
else:
app.run(host="0.0.0.0", port=config.PORT, debug=False)
LuckyThirteen_vulnerability_tester_plugin.py 文件源码
项目:midip-sslyze
作者: soukupa5
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def get_support_dtls_protocols_by_client(self):
"""Returns array which contains all DTLS protocols which are supported by client.
"""
dtls_ary = []
for dtls_version in self.DTLS_MODULE:
try:
SSL.Context._methods[dtls_version]=getattr(_lib, self.DTLS_MODULE[dtls_version])
except Exception as e:
pass
else:
dtls_ary.append(dtls_version)
return dtls_ary
LuckyThirteen_vulnerability_tester_plugin.py 文件源码
项目:midip-sslyze
作者: soukupa5
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def get_dtls_cipher_list(self, dtls_version):
"""Returns list of cipher suites available for protocol version, saves in parameter dtls_protocol.
Args:
dtls_protocol (str):.
"""
cnx = SSL.Context(dtls_version)
cnx.set_cipher_list('ALL:COMPLEMENTOFALL')
conn = SSL.Connection(cnx, socket.socket(socket.AF_INET,socket.SOCK_DGRAM))
return conn.get_cipher_list()
def __init__(self, server_address, certfile, keyfile, RequestHandlerClass, bind_and_activate=True):
from OpenSSL import SSL
SocketServer.BaseServer.__init__(self, server_address, RequestHandlerClass)
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.use_privatekey_file(keyfile)
ctx.use_certificate_file(certfile)
self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type))
if bind_and_activate:
self.server_bind()
self.server_activate()
def generate_adhoc_ssl_context():
"""Generates an adhoc SSL context for the development server."""
from OpenSSL import SSL
cert, pkey = generate_adhoc_ssl_pair()
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.use_privatekey(pkey)
ctx.use_certificate(cert)
return ctx