def parse_request(self):
res = SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.parse_request(self)
if not res:
return res
database_name = self.path[1:]
if not database_name:
self.tryton = {'user': None, 'session': None}
return res
try:
method, up64 = self.headers['Authorization'].split(None, 1)
if method.strip().lower() == 'basic':
user, password = base64.decodestring(up64).split(':', 1)
user_id, session = security.login(database_name, user,
password)
self.tryton = {'user': user_id, 'session': session}
return res
except Exception:
pass
self.send_error(401, 'Unauthorized')
self.send_header("WWW-Authenticate", 'Basic realm="Tryton"')
return False
python类SimpleXMLRPCServer()的实例源码
def rpcserver(self):
"""
rpcserver is intended to be launched as a separate thread. This class
will launch an RPC server that can pass and receive debugging events
to debugging clients.
"""
try:
self.log.info("plugin.http.ObjectEditor: starting XML RPC Server")
server = SimpleXMLRPCServer(addr=("localhost", 20758), logRequests=False, allow_none=1)
#server.register_function(self.disable, "")
server.register_function(self.connect, "connect")
server.serve_forever()
except:
self.log.error("plugin.http.ObjectEditor: rpcserver: error " \
"connecting to remote")
self.log.error(sys.exc_info())
xml_rpc_server.py 文件源码
项目:creator-system-test-framework
作者: CreatorDev
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def startXMLRPCServer(rpcInstance, address, port, logFilename):
rpcInstance.logger = createLogger(logFilename)
#server = SimpleXMLRPCServer.SimpleXMLRPCServer((address, port), allow_none=True, logRequests = True)
server = VerboseFaultXMLRPCServer((address, port), LoggingSimpleXMLRPCRequestHandler, allow_none=True)
print "SimpleXMLRPCServer Started. Listening on %s:%d..." % (address, port, )
server.register_instance(rpcInstance, True)
try:
while True:
server.handle_request()
#server.serve_forever()
except KeyboardInterrupt:
pass
except Exception:
raise
finally:
print "Exiting"
server.server_close()
def test_basic(self):
# check that flag is false by default
flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header
self.assertEqual(flagval, False)
# enable traceback reporting
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
# test a call that shouldn't fail just as a smoke test
try:
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p.pow(6,8), 6**8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_fail_with_info(self):
# use the broken message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
# Check that errors in the server send back exception/traceback
# info when flag is set
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# We should get error info in the response
expected_err = "invalid literal for int() with base 10: 'I am broken'"
self.assertEqual(e.headers.get("x-exception"), expected_err)
self.assertTrue(e.headers.get("x-traceback") is not None)
else:
self.fail('ProtocolError not raised')
def test_basic(self):
# check that flag is false by default
flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header
self.assertEqual(flagval, False)
# enable traceback reporting
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
# test a call that shouldn't fail just as a smoke test
try:
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p.pow(6,8), 6**8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_fail_with_info(self):
# use the broken message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
# Check that errors in the server send back exception/traceback
# info when flag is set
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# We should get error info in the response
expected_err = "invalid literal for int() with base 10: 'I am broken'"
self.assertEqual(e.headers.get("x-exception"), expected_err)
self.assertTrue(e.headers.get("x-traceback") is not None)
else:
self.fail('ProtocolError not raised')
def main():
autoWait()
ea = ScreenEA()
server = SimpleXMLRPCServer(("localhost", 9000))
server.register_function(is_connected, "is_connected")
server.register_function(wrapper_get_raw, "get_raw")
server.register_function(wrapper_get_function, "get_function")
server.register_function(wrapper_Heads, "Heads")
server.register_function(wrapper_Functions, "Functions")
server.register_instance(IDAWrapper())
server.register_function(wrapper_quit, "quit")
server.serve_forever()
qexit(0)
def main():
autoWait()
ea = ScreenEA()
server = SimpleXMLRPCServer(("localhost", 9000))
server.register_function(is_connected, "is_connected")
server.register_function(wrapper_get_raw, "get_raw")
server.register_function(wrapper_get_function, "get_function")
server.register_function(wrapper_Heads, "Heads")
server.register_function(wrapper_Functions, "Functions")
server.register_instance(IDAWrapper())
server.register_function(wrapper_quit, "quit")
server.serve_forever()
qexit(0)
def __init__(self, addr):
SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, addr=addr, logRequests=False, allow_none=True, encoding='UTF-8')
self.address, self.port = addr
self._start_work_time = datetime.datetime.fromtimestamp(time.time())
self._last_interactive_time = None
self.register_introspection_functions()
self.register_function(self.get_start_work_time , 'get_start_work_time')
self.register_function(self.get_last_interactive_time , 'get_last_interactive_time')
self.register_function(self.get_pid, 'get_pid')
self.register_function(self.get_log, 'get_log')
self.register_function(self.get_binary_data, 'get_binary_data')
self.register_function(self.send_binary_data, 'send_binary_data')
self.register_function(self.is_working, 'is_working')
self.register_function(self.shutdown_driver, 'shutdown_driver')
self.register_package()
print_msg('Server(%s:%s) - Started' % (self.address, self.port))
def _dispatch(self, method, params):
_result = None
print_msg('--- --- --- --- --- ---')
print_msg('%s <<< %s' % (method, params))
try:
_result = SimpleXMLRPCServer.SimpleXMLRPCServer._dispatch(self, method, params)
except Exception, e:
print_err('%s - Error: %s' % (method, e))
print_err('%s - Traceback:\n%s' % (method, traceback.format_exc()))
raise
if method in XMLRPCServer.LOG_FILTERED_METHODS:
print_msg('%s >>> Binary Data' % method)
else:
print_msg('%s >>> %s' % (method, _result))
self._last_interactive_time = datetime.datetime.fromtimestamp(time.time())
return _result
def test_basic(self):
# check that flag is false by default
flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header
self.assertEqual(flagval, False)
# enable traceback reporting
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
# test a call that shouldn't fail just as a smoke test
try:
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p.pow(6,8), 6**8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_fail_with_info(self):
# use the broken message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
# Check that errors in the server send back exception/traceback
# info when flag is set
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# We should get error info in the response
expected_err = "invalid literal for int() with base 10: 'I am broken'"
self.assertEqual(e.headers.get("x-exception"), expected_err)
self.assertTrue(e.headers.get("x-traceback") is not None)
else:
self.fail('ProtocolError not raised')
def test_basic(self):
# check that flag is false by default
flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header
self.assertEqual(flagval, False)
# enable traceback reporting
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
# test a call that shouldn't fail just as a smoke test
try:
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p.pow(6,8), 6**8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_fail_with_info(self):
# use the broken message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
# Check that errors in the server send back exception/traceback
# info when flag is set
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, socket.error), e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# We should get error info in the response
expected_err = "invalid literal for int() with base 10: 'I am broken'"
self.assertEqual(e.headers.get("x-exception"), expected_err)
self.assertTrue(e.headers.get("x-traceback") is not None)
else:
self.fail('ProtocolError not raised')
def main():
autoWait()
ea = ScreenEA()
server = SimpleXMLRPCServer(("localhost", 9000))
server.register_function(is_connected, "is_connected")
server.register_function(wrapper_get_raw, "get_raw")
server.register_function(wrapper_get_function, "get_function")
server.register_function(wrapper_Heads, "Heads")
server.register_function(wrapper_Functions, "Functions")
server.register_instance(IDAWrapper())
server.register_function(wrapper_quit, "quit")
server.serve_forever()
qexit(0)
def main():
autoWait()
ea = ScreenEA()
server = SimpleXMLRPCServer(("localhost", 9000))
server.register_function(is_connected, "is_connected")
server.register_function(wrapper_get_raw, "get_raw")
server.register_function(wrapper_get_function, "get_function")
server.register_function(wrapper_Heads, "Heads")
server.register_function(wrapper_Functions, "Functions")
server.register_instance(IDAWrapper())
server.register_function(wrapper_quit, "quit")
server.serve_forever()
qexit(0)
def setupXMLRPCSocket(self):
"""
Listen for XML-RPC requests on a socket
"""
import SimpleXMLRPCServer
import threading
host="0.0.0.0"
if self.XMLRPCport==0:
return
try:
server = SimpleXMLRPCServer.SimpleXMLRPCServer((host, self.XMLRPCport),allow_none=True)
except TypeError:
print "2.4 Python did not allow allow_none=True!"
server = SimpleXMLRPCServer.SimpleXMLRPCServer((host, self.XMLRPCport))
self.log("Set up XMLRPC Socket on %s port %d"%(host, self.XMLRPCport))
listener_object=listener_instance(self)
server.register_instance(listener_object)
#start new thread.
lt=listener_thread(server, listener_object)
lt.start()
self.listener_thread=lt
self.listener=listener_object
return
def setupXMLRPCSocket(self):
"""
Listen for XML-RPC requests on a socket
"""
import SimpleXMLRPCServer
import threading
host="0.0.0.0"
if self.XMLRPCport==0:
return
try:
server = SimpleXMLRPCServer.SimpleXMLRPCServer((host, self.XMLRPCport),allow_none=True)
except TypeError:
print "2.4 Python did not allow allow_none=True!"
server = SimpleXMLRPCServer.SimpleXMLRPCServer((host, self.XMLRPCport))
self.log("Set up XMLRPC Socket on %s port %d"%(host, self.XMLRPCport))
listener_object=listener_instance(self)
server.register_instance(listener_object)
#start new thread.
lt=listener_thread(server, listener_object)
lt.start()
self.listener_thread=lt
self.listener=listener_object
return
def __init__(self, host, port, consumer=None):
"""host: the hostname or IP address of the RPC server
port: the port for the RPC server
consumer: the consuming object for received messages"""
self.host = host
self.port = port
self.consumer = consumer
self.server = SimpleXMLRPCServer((host, port),
logRequests=False,
allow_none=True)
self.server.register_function(self._client_send, "client_send")
self.msg = None
def __init__(self, server_address, HandlerClass, logRequests=1):
self.handlers = set()
SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, server_address,
HandlerClass, logRequests)
def server_bind(self):
self.socket.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
self.socket.setsockopt(socket.SOL_SOCKET,
socket.SO_KEEPALIVE, 1)
SimpleXMLRPCServer.SimpleXMLRPCServer.server_bind(self)
def server_close(self):
SimpleXMLRPCServer.SimpleXMLRPCServer.server_close(self)
for handler in self.handlers.copy():
self.shutdown_request(handler.request)
def run(self):
server = SimpleXMLRPCServer(('127.0.0.1', PSO_APPEND_PORT), allow_none=True)
server.register_function(self.pso_append, "append_entry")
server.serve_forever()
def run(self):
server = SimpleXMLRPCServer(('127.0.0.1', PAXOS_APPEND_PORT), allow_none=True)
server.register_function(self.messenger.paxos_append, "append_entry")
server.serve_forever()
def rpc_server():
rpcserver = SimpleXMLRPCServer.SimpleXMLRPCServer(('localhost', 8004), logRequests=False)
rpcserver.register_function(announce, 'announce')
print 'Starting xml rpc server...'
rpcserver.serve_forever()
def __init__(self, addr,
requestHandler=SecureXMLRPCRequestHandler,
logRequests=1):
"""
This is the exact same code as SimpleXMLRPCServer.__init__
except it calls SecureTCPServer.__init__ instead of plain
old TCPServer.__init__
"""
self.funcs = {}
self.logRequests = logRequests
self.instance = None
SecureTCPServer.__init__(self, addr, requestHandler)
def run(self):
self.server = SimpleXMLRPCServer(('localhost', SIMULATOR_PORT), allow_none = True)
self.server.register_instance(self.simulator)
log.info("Listening on %s" % SIMULATOR_PORT)
self.server.serve_forever()
def setUp(self):
# enable traceback reporting
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
self.evt = threading.Event()
# start server thread to handle requests
serv_args = (self.evt, self.request_count, self.requestHandler)
threading.Thread(target=self.threadFunc, args=serv_args).start()
# wait for the server to be ready
self.evt.wait(10)
self.evt.clear()
def tearDown(self):
# wait on the server thread to terminate
self.evt.wait(10)
# disable traceback reporting
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
# NOTE: The tests in SimpleServerTestCase will ignore failures caused by
# "temporarily unavailable" exceptions raised in SimpleXMLRPCServer. This
# condition occurs infrequently on some platforms, frequently on others, and
# is apparently caused by using SimpleXMLRPCServer with a non-blocking socket
# If the server class is updated at some point in the future to handle this
# situation more gracefully, these tests should be modified appropriately.