def parse_request(self):
res = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.parse_request(self)
if not res:
return res
database_name = self.path[1:]
if not database_name:
self.tryton = {'user': None, 'session': None}
return res
try:
method, up64 = self.headers['Authorization'].split(None, 1)
if method.strip().lower() == 'basic':
user, password = base64.decodestring(up64).split(':', 1)
user_id, session = security.login(database_name, user,
password)
self.tryton = {'user': user_id, 'session': session}
return res
except Exception:
pass
self.send_error(401, 'Unauthorized')
self.send_header("WWW-Authenticate", 'Basic realm="Tryton"')
return False
python类SimpleXMLRPCRequestHandler()的实例源码
def test_fail_with_info(self):
# use the broken message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
# Check that errors in the server send back exception/traceback
# info when flag is set
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# We should get error info in the response
expected_err = "invalid literal for int() with base 10: 'I am broken'"
self.assertEqual(e.headers.get("x-exception"), expected_err)
self.assertTrue(e.headers.get("x-traceback") is not None)
else:
self.fail('ProtocolError not raised')
def test_fail_with_info(self):
# use the broken message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
# Check that errors in the server send back exception/traceback
# info when flag is set
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# We should get error info in the response
expected_err = "invalid literal for int() with base 10: 'I am broken'"
self.assertEqual(e.headers.get("x-exception"), expected_err)
self.assertTrue(e.headers.get("x-traceback") is not None)
else:
self.fail('ProtocolError not raised')
def test_fail_with_info(self):
# use the broken message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
# Check that errors in the server send back exception/traceback
# info when flag is set
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# We should get error info in the response
expected_err = "invalid literal for int() with base 10: 'I am broken'"
self.assertEqual(e.headers.get("x-exception"), expected_err)
self.assertTrue(e.headers.get("x-traceback") is not None)
else:
self.fail('ProtocolError not raised')
def test_fail_with_info(self):
# use the broken message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
# Check that errors in the server send back exception/traceback
# info when flag is set
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# We should get error info in the response
expected_err = "invalid literal for int() with base 10: 'I am broken'"
self.assertEqual(e.headers.get("x-exception"), expected_err)
self.assertTrue(e.headers.get("x-traceback") is not None)
else:
self.fail('ProtocolError not raised')
def setup(self):
self.request = SSLSocket(self.request)
SimpleXMLRPCRequestHandler.setup(self)
def __init__(self, interface, port, secure=False):
daemon.__init__(self, interface, port, secure, name='XMLRPCDaemon')
if self.secure:
handler_class = SecureXMLRPCRequestHandler
server_class = SecureThreadedXMLRPCServer
if self.ipv6:
server_class = SecureThreadedXMLRPCServer6
else:
handler_class = SimpleXMLRPCRequestHandler
server_class = SimpleThreadedXMLRPCServer
if self.ipv6:
server_class = SimpleThreadedXMLRPCServer6
self.server = server_class((interface, port), handler_class, 0)
def send_header(self, keyword, value):
if keyword == 'Content-type' and value == 'text/xml':
value = 'application/json-rpc'
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.send_header(self,
keyword, value)
def tearDown(self):
# wait on the server thread to terminate
self.evt.wait()
# reset flag
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
# reset message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = mimetools.Message
def test_fail_no_info(self):
# use the broken message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# The two server-side error headers shouldn't be sent back in this case
self.assertTrue(e.headers.get("X-exception") is None)
self.assertTrue(e.headers.get("X-traceback") is None)
else:
self.fail('ProtocolError not raised')
def tearDown(self):
# wait on the server thread to terminate
self.evt.wait()
# reset flag
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
# reset message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = mimetools.Message
def test_fail_no_info(self):
# use the broken message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# The two server-side error headers shouldn't be sent back in this case
self.assertTrue(e.headers.get("X-exception") is None)
self.assertTrue(e.headers.get("X-traceback") is None)
else:
self.fail('ProtocolError not raised')
def tearDown(self):
# wait on the server thread to terminate
self.evt.wait()
# reset flag
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
# reset message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = mimetools.Message
def test_fail_no_info(self):
# use the broken message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# The two server-side error headers shouldn't be sent back in this case
self.assertTrue(e.headers.get("X-exception") is None)
self.assertTrue(e.headers.get("X-traceback") is None)
else:
self.fail('ProtocolError not raised')
def tearDown(self):
# wait on the server thread to terminate
self.evt.wait()
# reset flag
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
# reset message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = mimetools.Message
def test_fail_no_info(self):
# use the broken message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# The two server-side error headers shouldn't be sent back in this case
self.assertTrue(e.headers.get("X-exception") is None)
self.assertTrue(e.headers.get("X-traceback") is None)
else:
self.fail('ProtocolError not raised')
def serve_forever(self):
logger.info(
('action', 'console_server_running'),
('tid', get_thread_number()),
)
# ???????
self._handler = ConsoleHandler()
if self.code:
self._handler.runsource(self.code)
# ???????
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
class RequestHandler(SimpleXMLRPCRequestHandler):
rpc_paths = ('/rConsole',)
# ???server??
from SimpleXMLRPCServer import SimpleXMLRPCServer
self._server = SimpleXMLRPCServer(
(self.addr, self.port),
requestHandler=RequestHandler,
logRequests=False,
)
# ????????
self._server.register_function(self._handler.runsource, "runsource")
logger.info(
('action', 'xmlrpc_server_serving'),
('tid', get_thread_number()),
)
# ??RPC??
self._server.serve_forever()
def __init__(self, addr, requestHandler=SimpleJSONRPCRequestHandler,
logRequests=True, encoding=None, bind_and_activate=True,
address_family=socket.AF_INET,
config=jsonrpclib.config.DEFAULT):
"""
Sets up the server and the dispatcher
:param addr: The server listening address
:param requestHandler: Custom request handler
:param logRequests: Flag to(de)activate requests logging
:param encoding: The dispatcher request encoding
:param bind_and_activate: If True, starts the server immediately
:param address_family: The server listening address family
:param config: A JSONRPClib Config instance
"""
# Set up the dispatcher fields
SimpleJSONRPCDispatcher.__init__(self, encoding, config)
# Prepare the server configuration
# logRequests is used by SimpleXMLRPCRequestHandler
self.logRequests = logRequests
self.address_family = address_family
self.json_config = config
# Work on the request handler
class RequestHandlerWrapper(requestHandler, object):
"""
Wraps the request handle to have access to the configuration
"""
def __init__(self, *args, **kwargs):
"""
Constructs the wrapper after having stored the configuration
"""
self.config = config
super(RequestHandlerWrapper, self).__init__(*args, **kwargs)
# Set up the server
socketserver.TCPServer.__init__(self, addr, requestHandler,
bind_and_activate)
# Windows-specific
if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
flags |= fcntl.FD_CLOEXEC
fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
# ------------------------------------------------------------------------------
def http_server(evt, numrequests, requestHandler=None, encoding=None):
class TestInstanceClass:
def div(self, x, y):
return x // y
def _methodHelp(self, name):
if name == 'div':
return 'This is the div function'
def my_function():
'''This is my function'''
return True
class MyXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
def get_request(self):
# Ensure the socket is always non-blocking. On Linux, socket
# attributes are not inherited like they are on *BSD and Windows.
s, port = self.socket.accept()
s.setblocking(True)
return s, port
if not requestHandler:
requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
serv = MyXMLRPCServer(("localhost", 0), requestHandler,
encoding=encoding,
logRequests=False, bind_and_activate=False)
try:
serv.socket.settimeout(3)
serv.server_bind()
global ADDR, PORT, URL
ADDR, PORT = serv.socket.getsockname()
#connect to IP address directly. This avoids socket.create_connection()
#trying to connect to "localhost" using all address families, which
#causes slowdown e.g. on vista which supports AF_INET6. The server listens
#on AF_INET only.
URL = "http://%s:%d"%(ADDR, PORT)
serv.server_activate()
serv.register_introspection_functions()
serv.register_multicall_functions()
serv.register_function(pow)
serv.register_function(lambda x,y: x+y, 'add')
serv.register_function(lambda x: x, test_support.u(r't\xea\u0161t'))
serv.register_function(my_function)
serv.register_instance(TestInstanceClass())
evt.set()
# handle up to 'numrequests' requests
while numrequests > 0:
serv.handle_request()
numrequests -= 1
except socket.timeout:
pass
finally:
serv.socket.close()
PORT = None
evt.set()
def http_server(evt, numrequests, requestHandler=None, encoding=None):
class TestInstanceClass:
def div(self, x, y):
return x // y
def _methodHelp(self, name):
if name == 'div':
return 'This is the div function'
def my_function():
'''This is my function'''
return True
class MyXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
def get_request(self):
# Ensure the socket is always non-blocking. On Linux, socket
# attributes are not inherited like they are on *BSD and Windows.
s, port = self.socket.accept()
s.setblocking(True)
return s, port
if not requestHandler:
requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
serv = MyXMLRPCServer(("localhost", 0), requestHandler,
encoding=encoding,
logRequests=False, bind_and_activate=False)
try:
serv.socket.settimeout(3)
serv.server_bind()
global ADDR, PORT, URL
ADDR, PORT = serv.socket.getsockname()
#connect to IP address directly. This avoids socket.create_connection()
#trying to connect to "localhost" using all address families, which
#causes slowdown e.g. on vista which supports AF_INET6. The server listens
#on AF_INET only.
URL = "http://%s:%d"%(ADDR, PORT)
serv.server_activate()
serv.register_introspection_functions()
serv.register_multicall_functions()
serv.register_function(pow)
serv.register_function(lambda x,y: x+y, 'add')
serv.register_function(lambda x: x, test_support.u(r't\xea\u0161t'))
serv.register_function(my_function)
serv.register_instance(TestInstanceClass())
evt.set()
# handle up to 'numrequests' requests
while numrequests > 0:
serv.handle_request()
numrequests -= 1
except socket.timeout:
pass
finally:
serv.socket.close()
PORT = None
evt.set()
def http_server(evt, numrequests, requestHandler=None):
class TestInstanceClass:
def div(self, x, y):
return x // y
def _methodHelp(self, name):
if name == 'div':
return 'This is the div function'
def my_function():
'''This is my function'''
return True
class MyXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
def get_request(self):
# Ensure the socket is always non-blocking. On Linux, socket
# attributes are not inherited like they are on *BSD and Windows.
s, port = self.socket.accept()
s.setblocking(True)
return s, port
if not requestHandler:
requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
serv = MyXMLRPCServer(("localhost", 0), requestHandler,
logRequests=False, bind_and_activate=False)
try:
serv.socket.settimeout(3)
serv.server_bind()
global ADDR, PORT, URL
ADDR, PORT = serv.socket.getsockname()
#connect to IP address directly. This avoids socket.create_connection()
#trying to connect to "localhost" using all address families, which
#causes slowdown e.g. on vista which supports AF_INET6. The server listens
#on AF_INET only.
URL = "http://%s:%d"%(ADDR, PORT)
serv.server_activate()
serv.register_introspection_functions()
serv.register_multicall_functions()
serv.register_function(pow)
serv.register_function(lambda x,y: x+y, 'add')
serv.register_function(my_function)
serv.register_instance(TestInstanceClass())
evt.set()
# handle up to 'numrequests' requests
while numrequests > 0:
serv.handle_request()
numrequests -= 1
except socket.timeout:
pass
finally:
serv.socket.close()
PORT = None
evt.set()
def http_server(evt, numrequests, requestHandler=None):
class TestInstanceClass:
def div(self, x, y):
return x // y
def _methodHelp(self, name):
if name == 'div':
return 'This is the div function'
def my_function():
'''This is my function'''
return True
class MyXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
def get_request(self):
# Ensure the socket is always non-blocking. On Linux, socket
# attributes are not inherited like they are on *BSD and Windows.
s, port = self.socket.accept()
s.setblocking(True)
return s, port
if not requestHandler:
requestHandler = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler
serv = MyXMLRPCServer(("localhost", 0), requestHandler,
logRequests=False, bind_and_activate=False)
try:
serv.socket.settimeout(3)
serv.server_bind()
global ADDR, PORT, URL
ADDR, PORT = serv.socket.getsockname()
#connect to IP address directly. This avoids socket.create_connection()
#trying to connect to "localhost" using all address families, which
#causes slowdown e.g. on vista which supports AF_INET6. The server listens
#on AF_INET only.
URL = "http://%s:%d"%(ADDR, PORT)
serv.server_activate()
serv.register_introspection_functions()
serv.register_multicall_functions()
serv.register_function(pow)
serv.register_function(lambda x,y: x+y, 'add')
serv.register_function(my_function)
serv.register_instance(TestInstanceClass())
evt.set()
# handle up to 'numrequests' requests
while numrequests > 0:
serv.handle_request()
numrequests -= 1
except socket.timeout:
pass
finally:
serv.socket.close()
PORT = None
evt.set()