python类ServerProxy()的实例源码

gandi.py 文件源码 项目:lexicon 作者: AnalogJ 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, options, provider_options=None):
        """Initialize Gandi DNS provider."""

        super(Provider, self).__init__(options)

        if provider_options is None:
            provider_options = {}

        api_endpoint = provider_options.get('api_endpoint') or 'https://rpc.gandi.net/xmlrpc/'

        self.apikey = self.options['auth_token']
        self.api = xmlrpclib.ServerProxy(api_endpoint, allow_none=True)

        self.default_ttl = 3600

        # self.domain_id is required by test suite
        self.domain_id = None
        self.zone_id = None

        self.domain = self.options['domain'].lower()

    # Authenicate against provider,
    # Make any requests required to get the domain's id for this provider,
    # so it can be used in subsequent calls. Should throw an error if
    # authentication fails for any reason, or if the domain does not exist.
test_daemon.py 文件源码 项目:krafters 作者: GianlucaBortoli 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, algorithm_host, algorithm, algorithm_port):
        self.algorithm = algorithm
        self.algorithm_port = algorithm_port

        if algorithm == "rethinkdb":
            self.rdb_connection = r.connect('localhost', self.algorithm_port)
            logging.info("Connection with RethinkDB successful")
            rethinkdb_setup(self.rdb_connection)
            self.appendFunction = partial(rethinkdb_append_entry, self.rdb_connection)
        elif algorithm == "paxos" or algorithm == "pso":
            if algorithm_host == '\'\'' or algorithm_host == '':
                algorithm_host = '127.0.0.1'
            self.cluster_rpc_client = ServerProxy("http://{}:{}".format(algorithm_host, CLUSTER_APPEND_PORT),
                                                  allow_none=True)
            self.appendFunction = partial(cluster_append_entry, self.cluster_rpc_client)
        elif algorithm == "datastore":
            self.appendFunction = partial(datastore_append_entry)

    # wrapper used to execute multiple operations and register times
provisioner.py 文件源码 项目:krafters 作者: GianlucaBortoli 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def configure_rethinkdb_gce(cluster, configure_daemons):
    # RethinkDB requires multiple nodes to join the master
    # RethinkDB requires every node to provide a set of ports
    # One port is used to interact with the node running the algorithm (driver_port), the others are framework's ones
    # On GCE we can always use the same set of ports, because they will be surely available (VM just spun up)
    print("Trying to contact remote master configure daemon...")
    # configure_daemon = rpcClient('http://{}:{}'.format(cluster[0]['address'], CONFIGURE_DAEMON_PORT))
    print(configure_daemons[0].configure_rethinkdb_master(GCE_RETHINKDB_PORTS['cluster_port'],
                                                          GCE_RETHINKDB_PORTS['driver_port'],
                                                          GCE_RETHINKDB_PORTS['http_port'], cluster[0]['address']))
    print("Trying to contact remote followers configure daemons...")
    for (i, node) in enumerate(cluster[1:]):
        print(configure_daemons[i + 1].configure_rethinkdb_follower(node['id'], cluster[0]['address'],
                                                                    GCE_RETHINKDB_PORTS['cluster_port'],
                                                                    GCE_RETHINKDB_PORTS['cluster_port'],
                                                                    GCE_RETHINKDB_PORTS['driver_port'],
                                                                    GCE_RETHINKDB_PORTS['http_port']))


# main
test_xmlrpc.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_multicall(self):
        try:
            p = xmlrpclib.ServerProxy(URL)
            multicall = xmlrpclib.MultiCall(p)
            multicall.add(2,3)
            multicall.pow(6,8)
            multicall.div(127,42)
            add_result, pow_result, div_result = multicall()
            self.assertEqual(add_result, 2+3)
            self.assertEqual(pow_result, 6**8)
            self.assertEqual(div_result, 127//42)
        except (xmlrpclib.ProtocolError, socket.error) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
test_xmlrpc.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_non_existing_multicall(self):
        try:
            p = xmlrpclib.ServerProxy(URL)
            multicall = xmlrpclib.MultiCall(p)
            multicall.this_is_not_exists()
            result = multicall()

            # result.results contains;
            # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
            #   'method "this_is_not_exists" is not supported'>}]

            self.assertEqual(result.results[0]['faultCode'], 1)
            self.assertEqual(result.results[0]['faultString'],
                '<class \'Exception\'>:method "this_is_not_exists" '
                'is not supported')
        except (xmlrpclib.ProtocolError, socket.error) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
test_xmlrpc.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_two(self):
        p = xmlrpclib.ServerProxy(URL)
        #do three requests.
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        p("close")()

        #they should have all been handled by a single request handler
        self.assertEqual(len(self.RequestHandler.myRequests), 1)

        #check that we did at least two (the third may be pending append
        #due to thread scheduling)
        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)


#test special attribute access on the serverproxy, through the __call__
#function.
test_xmlrpc.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_close(self):
        p = xmlrpclib.ServerProxy(URL)
        #do some requests with close.
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        p("close")() #this should trigger a new keep-alive request
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        p("close")()

        #they should have all been two request handlers, each having logged at least
        #two complete requests
        self.assertEqual(len(self.RequestHandler.myRequests), 2)
        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
test_xmlrpc.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_basic(self):
        # check that flag is false by default
        flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
        self.assertEqual(flagval, False)

        # enable traceback reporting
        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True

        # test a call that shouldn't fail just as a smoke test
        try:
            p = xmlrpclib.ServerProxy(URL)
            self.assertEqual(p.pow(6,8), 6**8)
        except (xmlrpclib.ProtocolError, socket.error) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
test_xmlrpc.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_fail_with_info(self):
        # use the broken message class
        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass

        # Check that errors in the server send back exception/traceback
        # info when flag is set
        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True

        try:
            p = xmlrpclib.ServerProxy(URL)
            p.pow(6,8)
        except (xmlrpclib.ProtocolError, socket.error) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e) and hasattr(e, "headers"):
                # We should get error info in the response
                expected_err = "invalid literal for int() with base 10: 'I am broken'"
                self.assertEqual(e.headers.get("X-exception"), expected_err)
                self.assertTrue(e.headers.get("X-traceback") is not None)
        else:
            self.fail('ProtocolError not raised')
test_xmlrpc_net.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_current_time(self):
        # Get the current time from xmlrpc.com.  This code exercises
        # the minimal HTTP functionality in xmlrpclib.
        self.skipTest("time.xmlrpc.com is unreliable")
        server = xmlrpclib.ServerProxy("http://time.xmlrpc.com/RPC2")
        try:
            t0 = server.currentTime.getCurrentTime()
        except socket.error as e:
            self.skipTest("network error: %s" % e)
            return

        # Perform a minimal sanity check on the result, just to be sure
        # the request means what we think it means.
        t1 = xmlrpclib.DateTime()

        dt0 = xmlrpclib._datetime_type(t0.value)
        dt1 = xmlrpclib._datetime_type(t1.value)
        if dt0 > dt1:
            delta = dt0 - dt1
        else:
            delta = dt1 - dt0
        # The difference between the system time here and the system
        # time on the server should not be too big.
        self.assertTrue(delta.days <= 1)
name_server.py 文件源码 项目:yadfs 作者: osteotek 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def delete_from_chunk_servers(self, file):
        if file.type == NodeType.directory:
            for c in list(file.children):
                self.delete_from_chunk_servers(c)
        else:
            print('Start delete file', file.get_full_path())
            for f_path, servers in file.chunks.items():
                for cs in servers:
                    try:
                        cl = ServerProxy(cs)
                        print('Send delete', f_path, 'to', cs)
                        cl.delete_chunk(f_path)
                    except:
                        print('Failed to delete', f_path, 'from', cs)

    # get file\directory info by given path
    # path format: /my_dir/index/some.file
    # response format:
    # { 'status': Status.ok
    #   'type': NodeType.type
    #   'path': '/my_dir/index/some.file' - full path for directory
    #   'size': 2014 - size in bytes
    #   'chunks': {
    #       '/my_dir/index/some.file_0': cs-2
    #   }
client.py 文件源码 项目:yadfs 作者: osteotek 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_file_content(self, path):
        info = self.ns.get_file_info(path)
        if info['status'] != Status.ok:
            return info['status'], None

        chunks = info['chunks']
        content = ""
        data = {}
        for chunk, addr in chunks.items():
            cs = ServerProxy(addr)
            chunk_data = cs.get_chunk(chunk)
            index = int(chunk.split("_")[-1])
            data[index] = chunk_data

        i = 0
        while i < len(data):
            content += data[i]
            i += 1

        return Status.ok, content
test_xmlrpc.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_multicall(self):
        try:
            p = xmlrpclib.ServerProxy(URL)
            multicall = xmlrpclib.MultiCall(p)
            multicall.add(2,3)
            multicall.pow(6,8)
            multicall.div(127,42)
            add_result, pow_result, div_result = multicall()
            self.assertEqual(add_result, 2+3)
            self.assertEqual(pow_result, 6**8)
            self.assertEqual(div_result, 127//42)
        except (xmlrpclib.ProtocolError, socket.error) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
test_xmlrpc.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_non_existing_multicall(self):
        try:
            p = xmlrpclib.ServerProxy(URL)
            multicall = xmlrpclib.MultiCall(p)
            multicall.this_is_not_exists()
            result = multicall()

            # result.results contains;
            # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
            #   'method "this_is_not_exists" is not supported'>}]

            self.assertEqual(result.results[0]['faultCode'], 1)
            self.assertEqual(result.results[0]['faultString'],
                '<class \'Exception\'>:method "this_is_not_exists" '
                'is not supported')
        except (xmlrpclib.ProtocolError, socket.error) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
test_xmlrpc.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_two(self):
        p = xmlrpclib.ServerProxy(URL)
        #do three requests.
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        p("close")()

        #they should have all been handled by a single request handler
        self.assertEqual(len(self.RequestHandler.myRequests), 1)

        #check that we did at least two (the third may be pending append
        #due to thread scheduling)
        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)


#test special attribute access on the serverproxy, through the __call__
#function.
test_xmlrpc.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_close(self):
        p = xmlrpclib.ServerProxy(URL)
        #do some requests with close.
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        p("close")() #this should trigger a new keep-alive request
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        p("close")()

        #they should have all been two request handlers, each having logged at least
        #two complete requests
        self.assertEqual(len(self.RequestHandler.myRequests), 2)
        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
test_xmlrpc.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_basic(self):
        # check that flag is false by default
        flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
        self.assertEqual(flagval, False)

        # enable traceback reporting
        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True

        # test a call that shouldn't fail just as a smoke test
        try:
            p = xmlrpclib.ServerProxy(URL)
            self.assertEqual(p.pow(6,8), 6**8)
        except (xmlrpclib.ProtocolError, socket.error) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
test_xmlrpc.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_fail_with_info(self):
        # use the broken message class
        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass

        # Check that errors in the server send back exception/traceback
        # info when flag is set
        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True

        try:
            p = xmlrpclib.ServerProxy(URL)
            p.pow(6,8)
        except (xmlrpclib.ProtocolError, socket.error) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e) and hasattr(e, "headers"):
                # We should get error info in the response
                expected_err = "invalid literal for int() with base 10: 'I am broken'"
                self.assertEqual(e.headers.get("X-exception"), expected_err)
                self.assertTrue(e.headers.get("X-traceback") is not None)
        else:
            self.fail('ProtocolError not raised')
test_xmlrpc_net.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_current_time(self):
        # Get the current time from xmlrpc.com.  This code exercises
        # the minimal HTTP functionality in xmlrpclib.
        self.skipTest("time.xmlrpc.com is unreliable")
        server = xmlrpclib.ServerProxy("http://time.xmlrpc.com/RPC2")
        try:
            t0 = server.currentTime.getCurrentTime()
        except socket.error as e:
            self.skipTest("network error: %s" % e)
            return

        # Perform a minimal sanity check on the result, just to be sure
        # the request means what we think it means.
        t1 = xmlrpclib.DateTime()

        dt0 = xmlrpclib._datetime_type(t0.value)
        dt1 = xmlrpclib._datetime_type(t1.value)
        if dt0 > dt1:
            delta = dt0 - dt1
        else:
            delta = dt1 - dt0
        # The difference between the system time here and the system
        # time on the server should not be too big.
        self.assertTrue(delta.days <= 1)
easy_update.py 文件源码 项目:easy_update 作者: fizwit 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_pypi_info(pkg_name):
    """get version information from pypi.  If <pkg_name> is not found seach
    pypi. if <pkg_name> matches search results case; use the new value of
    pkg_name""" 
    client = xmlrpclib.ServerProxy('https://pypi.python.org/pypi')
    ver_list = client.package_releases(pkg_name)
    if len(ver_list) == 0:
        search_list = client.search({'name': pkg_name})
        for info in search_list:
            if pkg_name.lower() == info['name'].lower():
                 pkg_name = info['name']
                 break 
        ver_list = client.package_releases(pkg_name)
        if len(ver_list) == 0:
            return pkg_name, 'not found', {} 
    version = ver_list[0]
    xml_info = client.release_data(pkg_name, version)
    return pkg_name, version, xml_info
test_xmlrpc.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_multicall(self):
        try:
            p = xmlrpclib.ServerProxy(URL)
            multicall = xmlrpclib.MultiCall(p)
            multicall.add(2,3)
            multicall.pow(6,8)
            multicall.div(127,42)
            add_result, pow_result, div_result = multicall()
            self.assertEqual(add_result, 2+3)
            self.assertEqual(pow_result, 6**8)
            self.assertEqual(div_result, 127//42)
        except (xmlrpclib.ProtocolError, OSError) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
test_xmlrpc.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_non_existing_multicall(self):
        try:
            p = xmlrpclib.ServerProxy(URL)
            multicall = xmlrpclib.MultiCall(p)
            multicall.this_is_not_exists()
            result = multicall()

            # result.results contains;
            # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
            #   'method "this_is_not_exists" is not supported'>}]

            self.assertEqual(result.results[0]['faultCode'], 1)
            self.assertEqual(result.results[0]['faultString'],
                '<class \'Exception\'>:method "this_is_not_exists" '
                'is not supported')
        except (xmlrpclib.ProtocolError, OSError) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
test_xmlrpc.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_two(self):
        p = xmlrpclib.ServerProxy(URL)
        #do three requests.
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        p("close")()

        #they should have all been handled by a single request handler
        self.assertEqual(len(self.RequestHandler.myRequests), 1)

        #check that we did at least two (the third may be pending append
        #due to thread scheduling)
        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)


#test special attribute access on the serverproxy, through the __call__
#function.
test_xmlrpc.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_close(self):
        p = xmlrpclib.ServerProxy(URL)
        #do some requests with close.
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        p("close")() #this should trigger a new keep-alive request
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        p("close")()

        #they should have all been two request handlers, each having logged at least
        #two complete requests
        self.assertEqual(len(self.RequestHandler.myRequests), 2)
        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
test_xmlrpc.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_gzip_decode_limit(self):
        max_gzip_decode = 20 * 1024 * 1024
        data = b'\0' * max_gzip_decode
        encoded = xmlrpclib.gzip_encode(data)
        decoded = xmlrpclib.gzip_decode(encoded)
        self.assertEqual(len(decoded), max_gzip_decode)

        data = b'\0' * (max_gzip_decode + 1)
        encoded = xmlrpclib.gzip_encode(data)

        with self.assertRaisesRegex(ValueError,
                                    "max gzipped payload length exceeded"):
            xmlrpclib.gzip_decode(encoded)

        xmlrpclib.gzip_decode(encoded, max_decode=-1)


#Test special attributes of the ServerProxy object
test_xmlrpc.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_basic(self):
        # check that flag is false by default
        flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
        self.assertEqual(flagval, False)

        # enable traceback reporting
        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True

        # test a call that shouldn't fail just as a smoke test
        try:
            p = xmlrpclib.ServerProxy(URL)
            self.assertEqual(p.pow(6,8), 6**8)
        except (xmlrpclib.ProtocolError, OSError) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
test_xmlrpc.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_fail_with_info(self):
        # use the broken message class
        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass

        # Check that errors in the server send back exception/traceback
        # info when flag is set
        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True

        try:
            p = xmlrpclib.ServerProxy(URL)
            p.pow(6,8)
        except (xmlrpclib.ProtocolError, OSError) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e) and hasattr(e, "headers"):
                # We should get error info in the response
                expected_err = "invalid literal for int() with base 10: 'I am broken'"
                self.assertEqual(e.headers.get("X-exception"), expected_err)
                self.assertTrue(e.headers.get("X-traceback") is not None)
        else:
            self.fail('ProtocolError not raised')
pypi_repository.py 文件源码 项目:poet 作者: sdispater 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def find_packages(self, name, constraint=None):
        packages = []

        if constraint is not None:
            version_parser = VersionParser()
            constraint = version_parser.parse_constraints(constraint)

        with ServerProxy(self._url) as client:
            versions = client.package_releases(name, True)

        if constraint:
            versions = constraint.select([Version.coerce(v) for v in versions])

        for version in versions:
            try:
                packages.append(Package(name, version))
            except ValueError:
                continue

        return packages
pypi_repository.py 文件源码 项目:poet 作者: sdispater 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def search(self, query, mode=0):
        results = []

        search = {
            'name': query
        }

        if mode == self.SEARCH_FULLTEXT:
            search['summary'] = query

        client = ServerProxy(self._url)
        hits = client.search(search, 'or')

        for hit in hits:
            results.append({
                'name': hit['name'],
                'description': hit['summary'],
                'version': hit['version']
            })

        return results
test_xmlrpc.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_multicall(self):
        try:
            p = xmlrpclib.ServerProxy(URL)
            multicall = xmlrpclib.MultiCall(p)
            multicall.add(2,3)
            multicall.pow(6,8)
            multicall.div(127,42)
            add_result, pow_result, div_result = multicall()
            self.assertEqual(add_result, 2+3)
            self.assertEqual(pow_result, 6**8)
            self.assertEqual(div_result, 127//42)
        except (xmlrpclib.ProtocolError, OSError) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))


问题


面经


文章

微信
公众号

扫码关注公众号