def test_datetime_before_1900(self):
# same as before but with a date before 1900
dt = datetime.datetime(1, 2, 10, 11, 41, 23)
self.assertEqual(dt, xmlrpclib.DateTime('00010210T11:41:23'))
s = xmlrpclib.dumps((dt,))
result, m = xmlrpclib.loads(s, use_builtin_types=True)
(newdt,) = result
self.assertEqual(newdt, dt)
self.assertIs(type(newdt), datetime.datetime)
self.assertIsNone(m)
result, m = xmlrpclib.loads(s, use_builtin_types=False)
(newdt,) = result
self.assertEqual(newdt, dt)
self.assertIs(type(newdt), xmlrpclib.DateTime)
self.assertIsNone(m)
python类dumps()的实例源码
def test_dump_big_int(self):
if sys.maxsize > 2**31-1:
self.assertRaises(OverflowError, xmlrpclib.dumps,
(int(2**34),))
xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
self.assertRaises(OverflowError, xmlrpclib.dumps,
(xmlrpclib.MAXINT+1,))
self.assertRaises(OverflowError, xmlrpclib.dumps,
(xmlrpclib.MININT-1,))
def dummy_write(s):
pass
m = xmlrpclib.Marshaller()
m.dump_int(xmlrpclib.MAXINT, dummy_write)
m.dump_int(xmlrpclib.MININT, dummy_write)
self.assertRaises(OverflowError, m.dump_int,
xmlrpclib.MAXINT+1, dummy_write)
self.assertRaises(OverflowError, m.dump_int,
xmlrpclib.MININT-1, dummy_write)
def test_dump_bytes(self):
sample = b"my dog has fleas"
self.assertEqual(sample, xmlrpclib.Binary(sample))
for type_ in bytes, bytearray, xmlrpclib.Binary:
value = type_(sample)
s = xmlrpclib.dumps((value,))
result, m = xmlrpclib.loads(s, use_builtin_types=True)
(newvalue,) = result
self.assertEqual(newvalue, sample)
self.assertIs(type(newvalue), bytes)
self.assertIsNone(m)
result, m = xmlrpclib.loads(s, use_builtin_types=False)
(newvalue,) = result
self.assertEqual(newvalue, sample)
self.assertIs(type(newvalue), xmlrpclib.Binary)
self.assertIsNone(m)
def test_use_builtin_types(self):
# SimpleXMLRPCDispatcher.__init__ accepts use_builtin_types, which
# makes all dispatch of binary data as bytes instances, and all
# dispatch of datetime argument as datetime.datetime instances.
self.log = []
expected_bytes = b"my dog has fleas"
expected_date = datetime.datetime(2008, 5, 26, 18, 25, 12)
marshaled = xmlrpclib.dumps((expected_bytes, expected_date), 'foobar')
def foobar(*args):
self.log.extend(args)
handler = xmlrpc.server.SimpleXMLRPCDispatcher(
allow_none=True, encoding=None, use_builtin_types=True)
handler.register_function(foobar)
handler._marshaled_dispatch(marshaled)
self.assertEqual(len(self.log), 2)
mybytes, mydate = self.log
self.assertEqual(self.log, [expected_bytes, expected_date])
self.assertIs(type(mydate), datetime.datetime)
self.assertIs(type(mybytes), bytes)
def test_dump_big_int(self):
if sys.maxsize > 2**31-1:
self.assertRaises(OverflowError, xmlrpclib.dumps,
(int(2**34),))
xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
self.assertRaises(OverflowError, xmlrpclib.dumps,
(xmlrpclib.MAXINT+1,))
self.assertRaises(OverflowError, xmlrpclib.dumps,
(xmlrpclib.MININT-1,))
def dummy_write(s):
pass
m = xmlrpclib.Marshaller()
m.dump_int(xmlrpclib.MAXINT, dummy_write)
m.dump_int(xmlrpclib.MININT, dummy_write)
self.assertRaises(OverflowError, m.dump_int,
xmlrpclib.MAXINT+1, dummy_write)
self.assertRaises(OverflowError, m.dump_int,
xmlrpclib.MININT-1, dummy_write)
def test_dump_bytes(self):
sample = b"my dog has fleas"
self.assertEqual(sample, xmlrpclib.Binary(sample))
for type_ in bytes, bytearray, xmlrpclib.Binary:
value = type_(sample)
s = xmlrpclib.dumps((value,))
result, m = xmlrpclib.loads(s, use_builtin_types=True)
(newvalue,) = result
self.assertEqual(newvalue, sample)
self.assertIs(type(newvalue), bytes)
self.assertIsNone(m)
result, m = xmlrpclib.loads(s, use_builtin_types=False)
(newvalue,) = result
self.assertEqual(newvalue, sample)
self.assertIs(type(newvalue), xmlrpclib.Binary)
self.assertIsNone(m)
def test_dump_big_int(self):
if sys.maxsize > 2**31-1:
self.assertRaises(OverflowError, xmlrpclib.dumps,
(int(2**34),))
xmlrpclib.dumps((xmlrpclib.MAXINT, xmlrpclib.MININT))
self.assertRaises(OverflowError, xmlrpclib.dumps,
(xmlrpclib.MAXINT+1,))
self.assertRaises(OverflowError, xmlrpclib.dumps,
(xmlrpclib.MININT-1,))
def dummy_write(s):
pass
m = xmlrpclib.Marshaller()
m.dump_int(xmlrpclib.MAXINT, dummy_write)
m.dump_int(xmlrpclib.MININT, dummy_write)
self.assertRaises(OverflowError, m.dump_int,
xmlrpclib.MAXINT+1, dummy_write)
self.assertRaises(OverflowError, m.dump_int,
xmlrpclib.MININT-1, dummy_write)
def test_dump_bytes(self):
sample = b"my dog has fleas"
self.assertEqual(sample, xmlrpclib.Binary(sample))
for type_ in bytes, bytearray, xmlrpclib.Binary:
value = type_(sample)
s = xmlrpclib.dumps((value,))
result, m = xmlrpclib.loads(s, use_builtin_types=True)
(newvalue,) = result
self.assertEqual(newvalue, sample)
self.assertIs(type(newvalue), bytes)
self.assertIsNone(m)
result, m = xmlrpclib.loads(s, use_builtin_types=False)
(newvalue,) = result
self.assertEqual(newvalue, sample)
self.assertIs(type(newvalue), xmlrpclib.Binary)
self.assertIsNone(m)
def prepare(self):
"""
Prepare the query, marshall the payload, create binary data and calculate length (size).
"""
self.packet = dumps(self.args, methodname=self.method, allow_none=True).encode()
self.length = len(self.packet)
if (self.length + 8) > self._client.MAX_REQUEST_SIZE:
raise TransportException('The prepared query is larger than the maximum request size, we will not send this query!')
def __init__(self, client, method, *args, timeout=15, encode_json=True, response_id=True):
"""
Initiate a Scripted Query.
:param client: Client instance
:param method: Method name
:param args: Arguments
:param timeout: Timeout to wait for future result.
:param encode_json: Is body json? True by default.
:param response_id: Is request requiring response_id?
:type client: pyplanet.core.gbx.client.GbxClient
"""
# Make sure we call the script stuff with TriggerModeScriptEventArray.
gbx_method = 'TriggerModeScriptEventArray'
gbx_args = list()
# Make sure we generate a response_id.
if response_id is True:
self.response_id = uuid.uuid4().hex
else:
self.response_id = None
# Encode to json if args are given, and encode_json is true (default).
if encode_json and len(args) > 0:
gbx_args.append(json.dumps(args))
elif not encode_json and len(args) > 0:
gbx_args.extend(args)
# Add the response_id to the end of the argument list.
if self.response_id:
gbx_args.append(str(self.response_id))
super().__init__(client, gbx_method, method, gbx_args, timeout=timeout)
def execute(self, method, *args, timeout=45.0):
"""
Query the dedicated server and return the results. This method is a coroutine and should be awaited on.
The result you get will be a tuple with data inside (the response payload).
:param method: Server method.
:param args: Arguments.
:param timeout: Wait for x seconds until future is returned. Default is 45 seconds.
:type method: str
:type args: any
:return: Tuple with response data (after awaiting).
:rtype: Future<tuple>
"""
request_bytes = dumps(args, methodname=method, allow_none=True).encode()
length_bytes = len(request_bytes).to_bytes(4, byteorder='little')
handler = self.get_next_handler()
handler_bytes = handler.to_bytes(4, byteorder='little')
# Create new future to be returned.
self.handlers[handler] = future = asyncio.Future()
# Send to server.
self.writer.write(length_bytes + handler_bytes + request_bytes)
return await asyncio.wait_for(future, timeout)
def serialize (self):
self.__xmlrpc_serialized = True
if self.uri [-1] != "/":
self.uri += "/"
self.path += "/"
data = xmlrpclib.dumps (self.params, self.method, allow_none = 1).encode ("utf8")
self.headers ["Content-Type"] = "text/xml; charset=utf-8"
cl = len (data)
self.headers ["Content-Length"] = cl
self.set_content_length (cl)
return data
def serialize (self):
if not self.params:
if self.get_method () in ("POST", "PUT", "PATCH"):
self.headers ["Content-Length"] = 0
return b""
# formdata type can be string, dict, boolean
if self.get_method () in ("GET", "DELETE"):
if type (self.params) is dict:
params = self.urlencode (to_bytes = False)
else:
params = self.params
self.uri += "?" + params
self.path += "?" + params
self.params = None
return b""
data = self.params
header_name, content_type = self.get_header ("content-type", True)
if not content_type:
content_type = "application/x-www-form-urlencoded"
if type (self.params) is dict:
if content_type.startswith ("application/json"):
data = json.dumps (self.params).encode ("utf8")
content_type = "application/json; charset=utf-8"
elif content_type.startswith ("application/x-www-form-urlencoded"):
data = self.urlencode ()
content_type = "application/x-www-form-urlencoded; charset=utf-8"
elif content_type.startswith ("text/namevalue"):
data = self.nvpencode ()
content_type = "text/namevalue; charset=utf-8"
self.headers ["Content-Type"] = content_type
return self.to_bytes (data)
def __init__(self, conn, dumps, loads):
self._conn = conn
self._dumps = dumps
self._loads = loads
for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
obj = getattr(conn, attr)
setattr(self, attr, obj)
def _xml_dumps(obj):
return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
def test_dump_load(self):
dump = xmlrpclib.dumps((alist,))
load = xmlrpclib.loads(dump)
self.assertEqual(alist, load[0][0])
def test_dump_bare_datetime(self):
# This checks that an unwrapped datetime.date object can be handled
# by the marshalling code. This can't be done via test_dump_load()
# since with use_datetime set to 1 the unmarshaller would create
# datetime objects for the 'datetime[123]' keys as well
dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
s = xmlrpclib.dumps((dt,))
(newdt,), m = xmlrpclib.loads(s, use_datetime=1)
self.assertEqual(newdt, dt)
self.assertEqual(m, None)
(newdt,), m = xmlrpclib.loads(s, use_datetime=0)
self.assertEqual(newdt, xmlrpclib.DateTime('20050210T11:41:23'))
def test_datetime_before_1900(self):
# same as before but with a date before 1900
dt = datetime.datetime(1, 2, 10, 11, 41, 23)
s = xmlrpclib.dumps((dt,))
(newdt,), m = xmlrpclib.loads(s, use_datetime=1)
self.assertEqual(newdt, dt)
self.assertEqual(m, None)
(newdt,), m = xmlrpclib.loads(s, use_datetime=0)
self.assertEqual(newdt, xmlrpclib.DateTime('00010210T11:41:23'))
def test_newstyle_class(self):
class T(object):
pass
t = T()
t.x = 100
t.y = "Hello"
((t2,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((t,)))
self.assertEqual(t2, t.__dict__)
def test_dump_big_long(self):
self.assertRaises(OverflowError, xmlrpclib.dumps, (2**99,))
def test_dump_bad_dict(self):
self.assertRaises(TypeError, xmlrpclib.dumps, ({(1,2,3): 1},))
def test_dump_recursive_seq(self):
l = [1,2,3]
t = [3,4,5,l]
l.append(t)
self.assertRaises(TypeError, xmlrpclib.dumps, (l,))
def test_dump_recursive_dict(self):
d = {'1':1, '2':1}
t = {'3':3, 'd':d}
d['t'] = t
self.assertRaises(TypeError, xmlrpclib.dumps, (d,))
def test_dump_none(self):
value = alist + [None]
arg1 = (alist + [None],)
strg = xmlrpclib.dumps(arg1, allow_none=True)
self.assertEqual(value,
xmlrpclib.loads(strg)[0][0])
self.assertRaises(TypeError, xmlrpclib.dumps, (arg1,))
def test_dump_bytes(self):
self.assertRaises(TypeError, xmlrpclib.dumps, (b"my dog has fleas",))
def test_dump_fault(self):
f = xmlrpclib.Fault(42, 'Test Fault')
s = xmlrpclib.dumps((f,))
(newf,), m = xmlrpclib.loads(s)
self.assertEqual(newf, {'faultCode': 42, 'faultString': 'Test Fault'})
self.assertEqual(m, None)
s = xmlrpclib.Marshaller().dumps(f)
self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, s)
def _marshaled_dispatch(self, data, dispatch_method = None, path = None):
"""Dispatches an XML-RPC method from marshalled (XML) data.
XML-RPC methods are dispatched from the marshalled (XML) data
using the _dispatch method and the result is returned as
marshalled data. For backwards compatibility, a dispatch
function can be provided as an argument (see comment in
SimpleXMLRPCRequestHandler.do_POST) but overriding the
existing method through subclassing is the preferred means
of changing method dispatch behavior.
"""
try:
params, method = loads(data)
# generate response
if dispatch_method is not None:
response = dispatch_method(method, params)
else:
response = self._dispatch(method, params)
# wrap response in a singleton tuple
response = (response,)
response = dumps(response, methodresponse=1,
allow_none=self.allow_none, encoding=self.encoding)
except Fault as fault:
response = dumps(fault, allow_none=self.allow_none,
encoding=self.encoding)
except:
# report exception back to server
exc_type, exc_value, exc_tb = sys.exc_info()
response = dumps(
Fault(1, "%s:%s" % (exc_type, exc_value)),
encoding=self.encoding, allow_none=self.allow_none,
)
return response.encode(self.encoding)
def _marshaled_dispatch(self, data, dispatch_method = None, path = None):
try:
response = self.dispatchers[path]._marshaled_dispatch(
data, dispatch_method, path)
except:
# report low level exception back to server
# (each dispatcher should have handled their own
# exceptions)
exc_type, exc_value = sys.exc_info()[:2]
response = dumps(
Fault(1, "%s:%s" % (exc_type, exc_value)),
encoding=self.encoding, allow_none=self.allow_none)
response = response.encode(self.encoding)
return response
def test_dump_load(self):
dump = xmlrpclib.dumps((alist,))
load = xmlrpclib.loads(dump)
self.assertEqual(alist, load[0][0])
def test_dump_bare_datetime(self):
# This checks that an unwrapped datetime.date object can be handled
# by the marshalling code. This can't be done via test_dump_load()
# since with use_builtin_types set to 1 the unmarshaller would create
# datetime objects for the 'datetime[123]' keys as well
dt = datetime.datetime(2005, 2, 10, 11, 41, 23)
self.assertEqual(dt, xmlrpclib.DateTime('20050210T11:41:23'))
s = xmlrpclib.dumps((dt,))
result, m = xmlrpclib.loads(s, use_builtin_types=True)
(newdt,) = result
self.assertEqual(newdt, dt)
self.assertIs(type(newdt), datetime.datetime)
self.assertIsNone(m)
result, m = xmlrpclib.loads(s, use_builtin_types=False)
(newdt,) = result
self.assertEqual(newdt, dt)
self.assertIs(type(newdt), xmlrpclib.DateTime)
self.assertIsNone(m)
result, m = xmlrpclib.loads(s, use_datetime=True)
(newdt,) = result
self.assertEqual(newdt, dt)
self.assertIs(type(newdt), datetime.datetime)
self.assertIsNone(m)
result, m = xmlrpclib.loads(s, use_datetime=False)
(newdt,) = result
self.assertEqual(newdt, dt)
self.assertIs(type(newdt), xmlrpclib.DateTime)
self.assertIsNone(m)