def test_dump_bytes(self):
sample = b"my dog has fleas"
self.assertEqual(sample, xmlrpclib.Binary(sample))
for type_ in bytes, bytearray, xmlrpclib.Binary:
value = type_(sample)
s = xmlrpclib.dumps((value,))
result, m = xmlrpclib.loads(s, use_builtin_types=True)
(newvalue,) = result
self.assertEqual(newvalue, sample)
self.assertIs(type(newvalue), bytes)
self.assertIsNone(m)
result, m = xmlrpclib.loads(s, use_builtin_types=False)
(newvalue,) = result
self.assertEqual(newvalue, sample)
self.assertIs(type(newvalue), xmlrpclib.Binary)
self.assertIsNone(m)
python类Binary()的实例源码
def test_dump_bytes(self):
sample = b"my dog has fleas"
self.assertEqual(sample, xmlrpclib.Binary(sample))
for type_ in bytes, bytearray, xmlrpclib.Binary:
value = type_(sample)
s = xmlrpclib.dumps((value,))
result, m = xmlrpclib.loads(s, use_builtin_types=True)
(newvalue,) = result
self.assertEqual(newvalue, sample)
self.assertIs(type(newvalue), bytes)
self.assertIsNone(m)
result, m = xmlrpclib.loads(s, use_builtin_types=False)
(newvalue,) = result
self.assertEqual(newvalue, sample)
self.assertIs(type(newvalue), xmlrpclib.Binary)
self.assertIsNone(m)
def test_dump_bytes(self):
sample = b"my dog has fleas"
self.assertEqual(sample, xmlrpclib.Binary(sample))
for type_ in bytes, bytearray, xmlrpclib.Binary:
value = type_(sample)
s = xmlrpclib.dumps((value,))
result, m = xmlrpclib.loads(s, use_builtin_types=True)
(newvalue,) = result
self.assertEqual(newvalue, sample)
self.assertIs(type(newvalue), bytes)
self.assertIsNone(m)
result, m = xmlrpclib.loads(s, use_builtin_types=False)
(newvalue,) = result
self.assertEqual(newvalue, sample)
self.assertIs(type(newvalue), xmlrpclib.Binary)
self.assertIsNone(m)
def send_back_binary(self, bin):
"""Accepts single Binary argument, and unpacks and
repacks it to return it."""
data = bin.data
print('send_back_binary({!r})'.format(data))
response = Binary(data)
return response
def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
'''Run xmlrpc server'''
import umsgpack
from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
try:
from xmlrpc.client import Binary
except ImportError:
from xmlrpclib import Binary
application = WSGIXMLRPCApplication()
application.register_function(self.quit, '_quit')
application.register_function(self.size)
def sync_fetch(task):
result = self.sync_fetch(task)
result = Binary(umsgpack.packb(result))
return result
application.register_function(sync_fetch, 'fetch')
def dump_counter(_time, _type):
return self._cnt[_time].to_dict(_type)
application.register_function(dump_counter, 'counter')
import tornado.wsgi
import tornado.ioloop
import tornado.httpserver
container = tornado.wsgi.WSGIContainer(application)
self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
self.xmlrpc_server.listen(port=port, address=bind)
logger.info('fetcher.xmlrpc listening on %s:%s', bind, port)
self.xmlrpc_ioloop.start()
def test_default(self):
t = xmlrpclib.Binary()
self.assertEqual(str(t), '')
def test_string(self):
d = b'\x01\x02\x03abc123\xff\xfe'
t = xmlrpclib.Binary(d)
self.assertEqual(str(t), str(d, "latin-1"))
def test_decode(self):
d = b'\x01\x02\x03abc123\xff\xfe'
de = base64.encodebytes(d)
t1 = xmlrpclib.Binary()
t1.decode(de)
self.assertEqual(str(t1), str(d, "latin-1"))
t2 = xmlrpclib._binary(de)
self.assertEqual(str(t2), str(d, "latin-1"))
def test_default(self):
t = xmlrpclib.Binary()
self.assertEqual(str(t), '')
def test_string(self):
d = b'\x01\x02\x03abc123\xff\xfe'
t = xmlrpclib.Binary(d)
self.assertEqual(str(t), str(d, "latin-1"))
def test_decode(self):
d = b'\x01\x02\x03abc123\xff\xfe'
de = base64.encodebytes(d)
t1 = xmlrpclib.Binary()
t1.decode(de)
self.assertEqual(str(t1), str(d, "latin-1"))
t2 = xmlrpclib._binary(de)
self.assertEqual(str(t2), str(d, "latin-1"))
def test_default(self):
t = xmlrpclib.Binary()
self.assertEqual(str(t), '')
def test_string(self):
d = b'\x01\x02\x03abc123\xff\xfe'
t = xmlrpclib.Binary(d)
self.assertEqual(str(t), str(d, "latin-1"))
def test_decode(self):
d = b'\x01\x02\x03abc123\xff\xfe'
de = base64.encodebytes(d)
t1 = xmlrpclib.Binary()
t1.decode(de)
self.assertEqual(str(t1), str(d, "latin-1"))
t2 = xmlrpclib._binary(de)
self.assertEqual(str(t2), str(d, "latin-1"))
def test_default(self):
t = xmlrpclib.Binary()
self.assertEqual(str(t), '')
def test_string(self):
d = b'\x01\x02\x03abc123\xff\xfe'
t = xmlrpclib.Binary(d)
self.assertEqual(str(t), str(d, "latin-1"))
def test_decode(self):
d = b'\x01\x02\x03abc123\xff\xfe'
de = base64.encodebytes(d)
t1 = xmlrpclib.Binary()
t1.decode(de)
self.assertEqual(str(t1), str(d, "latin-1"))
t2 = xmlrpclib._binary(de)
self.assertEqual(str(t2), str(d, "latin-1"))
def retrieve(self,stub,jobNumber,password):
# NEOS should return results as uu-encoded xmlrpclib.Binary data
results = self.neos.getFinalResults(jobNumber,password)
if isinstance(results,xmlrpclib.Binary):
results = results.data
#decode results to kestrel.sol
# Well try to anyway, any errors will result in error strings in .sol file
# instead of solution.
if stub[-4:] == '.sol':
stub = stub[:-4]
solfile = open(stub + ".sol","wb")
solfile.write(results)
solfile.close()