def __init__(self, server_address, HandlerClass, logRequests=True):
"""Secure XML-RPC server.
It it very similar to SimpleXMLRPCServer but it uses HTTPS for transporting XML data.
"""
self.logRequests = logRequests
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
SocketServer.BaseServer.__init__(self, server_address, HandlerClass)
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.use_privatekey_file (KEYFILE)
ctx.use_certificate_file(CERTFILE)
self.socket = SSL.Connection(ctx, socket.socket(self.address_family,
self.socket_type))
self.server_bind()
self.server_activate()
python类SimpleXMLRPCDispatcher()的实例源码
def __init__(self, encoding=None):
# todo: use super
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, allow_none=True, encoding=encoding)
def do_POST(self):
"""Handles the HTTPS POST request.
It was copied out from SimpleXMLRPCServer.py and modified to shutdown the socket cleanly.
"""
try:
# get arguments
data = self.rfile.read(int(self.headers["content-length"]))
# In previous versions of SimpleXMLRPCServer, _dispatch
# could be overridden in this class, instead of in
# SimpleXMLRPCDispatcher. To maintain backwards compatibility,
# check to see if a subclass implements _dispatch and dispatch
# using that method if present.
response = self.server._marshaled_dispatch(
data, getattr(self, '_dispatch', None)
)
except: # This should only happen if the module is buggy
# internal error, report as HTTP server error
self.send_response(500)
self.end_headers()
else:
# got a valid XML RPC response
self.send_response(200)
self.send_header("Content-type", "text/xml")
self.send_header("Content-length", str(len(response)))
self.end_headers()
self.wfile.write(response)
# shut down the connection
self.wfile.flush()
self.connection.shutdown() # Modified here!
def __init__(self, encoding=None):
# todo: use super
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, allow_none=True, encoding=encoding)
def handler(request, response, methods):
response.session_id = None # no sessions for xmlrpc
dispatcher = SimpleXMLRPCDispatcher(allow_none=True, encoding=None)
for method in methods:
dispatcher.register_function(method)
dispatcher.register_introspection_functions()
response.headers['Content-Type'] = 'text/xml'
dispatch = getattr(dispatcher, '_dispatch', None)
return dispatcher._marshaled_dispatch(request.body.read(), dispatch)
def handler(request, response, methods):
response.session_id = None # no sessions for xmlrpc
dispatcher = SimpleXMLRPCDispatcher(allow_none=True, encoding=None)
for method in methods:
dispatcher.register_function(method)
dispatcher.register_introspection_functions()
response.headers['Content-Type'] = 'text/xml'
dispatch = getattr(dispatcher, '_dispatch', None)
return dispatcher._marshaled_dispatch(request.body.read(), dispatch)
def __init__(self, encoding=None, config=jsonrpclib.config.DEFAULT):
"""
Sets up the dispatcher with the given encoding.
None values are allowed.
"""
xmlrpcserver.SimpleXMLRPCDispatcher.__init__(
self, allow_none=True, encoding=encoding or "UTF-8")
self.json_config = config
# Notification thread pool
self.__notification_pool = None
def handler(request, response, methods):
response.session_id = None # no sessions for xmlrpc
dispatcher = SimpleXMLRPCDispatcher(allow_none=True, encoding=None)
for method in methods:
dispatcher.register_function(method)
dispatcher.register_introspection_functions()
response.headers['Content-Type'] = 'text/xml'
dispatch = getattr(dispatcher, '_dispatch', None)
return dispatcher._marshaled_dispatch(request.body.read(), dispatch)
def __init__(self, encoding=None):
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(
self, allow_none=True, encoding=encoding)
def handler(request, response, methods):
response.session_id = None # no sessions for xmlrpc
dispatcher = SimpleXMLRPCDispatcher(allow_none=True, encoding=None)
for method in methods:
dispatcher.register_function(method)
dispatcher.register_introspection_functions()
response.headers['Content-Type'] = 'text/xml'
dispatch = getattr(dispatcher, '_dispatch', None)
return dispatcher._marshaled_dispatch(request.body.read(), dispatch)
def handler(request, response, methods):
response.session_id = None # no sessions for xmlrpc
dispatcher = SimpleXMLRPCDispatcher(allow_none=True, encoding=None)
for method in methods:
dispatcher.register_function(method)
dispatcher.register_introspection_functions()
response.headers['Content-Type'] = 'text/xml'
dispatch = getattr(dispatcher, '_dispatch', None)
return dispatcher._marshaled_dispatch(request.body.read(), dispatch)
def handler(request, response, methods):
response.session_id = None # no sessions for xmlrpc
dispatcher = SimpleXMLRPCDispatcher(allow_none=True, encoding=None)
for method in methods:
dispatcher.register_function(method)
dispatcher.register_introspection_functions()
response.headers['Content-Type'] = 'text/xml'
dispatch = getattr(dispatcher, '_dispatch', None)
return dispatcher._marshaled_dispatch(request.body.read(), dispatch)