python类ServerProxy()的实例源码

test_xmlrpc.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_non_existing_multicall(self):
        try:
            p = xmlrpclib.ServerProxy(URL)
            multicall = xmlrpclib.MultiCall(p)
            multicall.this_is_not_exists()
            result = multicall()

            # result.results contains;
            # [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
            #   'method "this_is_not_exists" is not supported'>}]

            self.assertEqual(result.results[0]['faultCode'], 1)
            self.assertEqual(result.results[0]['faultString'],
                '<class \'Exception\'>:method "this_is_not_exists" '
                'is not supported')
        except (xmlrpclib.ProtocolError, OSError) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
test_xmlrpc.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_two(self):
        p = xmlrpclib.ServerProxy(URL)
        #do three requests.
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        p("close")()

        #they should have all been handled by a single request handler
        self.assertEqual(len(self.RequestHandler.myRequests), 1)

        #check that we did at least two (the third may be pending append
        #due to thread scheduling)
        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)


#test special attribute access on the serverproxy, through the __call__
#function.
test_xmlrpc.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_close(self):
        p = xmlrpclib.ServerProxy(URL)
        #do some requests with close.
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        p("close")() #this should trigger a new keep-alive request
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        self.assertEqual(p.pow(6,8), 6**8)
        p("close")()

        #they should have all been two request handlers, each having logged at least
        #two complete requests
        self.assertEqual(len(self.RequestHandler.myRequests), 2)
        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
test_xmlrpc.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_fail_with_info(self):
        # use the broken message class
        xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass

        # Check that errors in the server send back exception/traceback
        # info when flag is set
        xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True

        try:
            p = xmlrpclib.ServerProxy(URL)
            p.pow(6,8)
        except (xmlrpclib.ProtocolError, OSError) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e) and hasattr(e, "headers"):
                # We should get error info in the response
                expected_err = "invalid literal for int() with base 10: 'I am broken'"
                self.assertEqual(e.headers.get("X-exception"), expected_err)
                self.assertTrue(e.headers.get("X-traceback") is not None)
        else:
            self.fail('ProtocolError not raised')
xmlrpc.py 文件源码 项目:Kiwi 作者: kiwitcms 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, username, password, url, use_mod_auth_kerb=False):
        if url.startswith('https://'):
            self._transport = SafeCookieTransport()
        elif url.startswith('http://'):
            self._transport = CookieTransport()
        else:
            raise TCMSError("Unrecognized URL scheme")

        self._transport.cookiejar = CookieJar()
        # print("COOKIES:", self._transport.cookiejar._cookies)
        self.server = xmlrpclib.ServerProxy(
            url,
            transport=self._transport,
            verbose=VERBOSE,
            allow_none=1
        )

        # Login, get a cookie into our cookie jar (login_dict):
        self.server.Auth.login(username, password)

        # Record the user ID in case the script wants this
        # self.user_id = login_dict['id']
        # print('Logged in with cookie for user %i' % self.userId)
        # print("COOKIES:", self._transport.cookiejar._cookies)
xmlrpc.py 文件源码 项目:Kiwi 作者: kiwitcms 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, url):
        if url.startswith('https://'):
            self._transport = KerbTransport()
        elif url.startswith('http://'):
            raise TCMSError("Encrypted https communication required for "
                            "Kerberos authentication.\nURL provided: {0}".format(url))
        else:
            raise TCMSError("Unrecognized URL scheme: {0}".format(url))

        self._transport.cookiejar = CookieJar()
        # print("COOKIES:", self._transport.cookiejar._cookies)
        self.server = xmlrpclib.ServerProxy(
            url,
            transport=self._transport,
            verbose=VERBOSE,
            allow_none=1
        )

        # Login, get a cookie into our cookie jar (login_dict):
        self.server.Auth.login_krbv()
python_popular_analyses.py 文件源码 项目:fabric8-analytics-jobs 作者: fabric8-analytics 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _use_pypi_xml_rpc(self):
        """Schedule analyses of packages based on PyPI index using XML-RPC.

        https://wiki.python.org/moin/PyPIXmlRpc
        """
        client = xmlrpclib.ServerProxy('https://pypi.python.org/pypi')
        # get a list of package names
        packages = sorted(client.list_packages())

        for idx, package in enumerate(packages[self.count.min:self.count.max]):
            releases = client.package_releases(package, True)  # True for show_hidden arg

            self.log.debug("Scheduling #%d. (number versions: %d)",
                           self.count.min + idx, self.nversions)
            for version in releases[:self.nversions]:
                self.analyses_selinon_flow(package, version)
XenAPI.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, uri, transport=None, encoding=None, verbose=0,
                 allow_none=0):
        xmlrpcclient.ServerProxy.__init__(self, uri, transport=transport,
                                          encoding=encoding, verbose=verbose,
                                          allow_none=allow_none)
        self.transport = transport
        self._session = None
        self.last_login_method = None
        self.last_login_params = None
        self.API_version = API_VERSION_1_1
XenAPI.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __getattr__(self, name):
        if name == 'handle':
            return self._session
        elif name == 'xenapi':
            return _Dispatcher(self.API_version, self.xenapi_request, None)
        elif name.startswith('login') or name.startswith('slave_local'):
            return lambda *params: self._login(name, params)
        elif name == 'logout':
            return _Dispatcher(self.API_version, self.xenapi_request, "logout")
        else:
            return xmlrpcclient.ServerProxy.__getattr__(self, name)
pypi.py 文件源码 项目:bootstrap-py 作者: mkouhei 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def search_package(name):
    """search package.

    :param str name: package name

    :rtype: list
    :return: package name list
    """
    client = xmlrpc_client.ServerProxy(PYPI_URL)
    return [pkg for pkg in client.search({'name': name})
            if pkg.get('name') == name]
test_executor.py 文件源码 项目:krafters 作者: GianlucaBortoli 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, nodes):
        for n in nodes:
            self.nodes_rpc[str(n["id"])] = Server("http://{}:{}".format(n["address"], str(n["rpcPort"])))
        for n in nodes:
            self.nodes_rpc[str(n["id"])].clean_all_qdisc()
            self.nodes_rpc[str(n["id"])].create_root_qdisc()
        for n in nodes:
            if not self.nodes_rpc[str(n["id"])].init_qdisc():
                raise Exception("[{}] Error initializing qdiscs".format(NETEM_ERROR))

    # modify connection from source to target using netem_command
pypi_top_packages_async.py 文件源码 项目:python3wos_asyncio 作者: cclauss 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create_tasks(session, max_pkgs=MAX_PKGS):
    client = ServerProxy(PYPI_URL)
    return [get_package_info(session, pkg_name, downloads)
            for pkg_name, downloads in client.top_packages(max_pkgs)]
pypi_top200_async_save.py 文件源码 项目:python3wos_asyncio 作者: cclauss 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_tasks(session, max_pkgs=MAX_PKGS):
    client = ServerProxy(PYPI_URL)
    return [get_package_info(session, pkg_name, downloads)
            for pkg_name, downloads in client.top_packages(max_pkgs)]
pypi_top200_async_old.py 文件源码 项目:python3wos_asyncio 作者: cclauss 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_pkg_info(pkg_name, downloads=0):
    # multiple asyncio jobs can not share a client
    client = ServerProxy(PYPI_URL)
    try:
        release = client.package_releases(pkg_name)[0]
    except IndexError:  # marionette-transport, ll-orasql, and similar
        print(pkg_name, 'has no releases in PyPI!!')
        return pkg_info(pkg_name, downloads, False, False, 'PyPI error!!', '')
    troves = '\n'.join(client.release_data(pkg_name, release)['classifiers'])
    py2only = py2_only_classifier in troves
    py3 = py3_classifier in troves
    url = client.release_data(pkg_name, release)['package_url']
    return pkg_info(pkg_name, downloads, py2only, py3, release, url)
pypi_top200_async_old.py 文件源码 项目:python3wos_asyncio 作者: cclauss 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def async_main(max_pkgs=MAX_PKGS):  # ~ 32 secs for 200 pkgs on my MacBookPro
    loop = asyncio.get_event_loop()
    client = ServerProxy(PYPI_URL)
    futures = [loop.run_in_executor(None, get_pkg_info, pkg_name, downloads)
               for pkg_name, downloads in client.top_packages(max_pkgs)]
    return [(yield from fut) for fut in futures]
Vimclientxml.py 文件源码 项目:VimRpc 作者: da-emzy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def VimRpc(address=None):
    if address is None:
        address = addr
    if address is None:
        print('ERROR No Valid ADDRESS')
        return -1
    _serv_add = 'http://%s:%s' % (address)
    lorris = ServerProxy(_serv_add, allow_none=True)
    return lorris
test_xmlrpc.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_ssl_presence(self):
        try:
            import ssl
        except ImportError:
            has_ssl = False
        else:
            has_ssl = True
        try:
            xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
        except NotImplementedError:
            self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
        except socket.error:
            self.assertTrue(has_ssl)
test_xmlrpc.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def make_request_and_skipIf(condition, reason):
    # If we skip the test, we have to make a request because the
    # the server created in setUp blocks expecting one to come in.
    if not condition:
        return lambda func: func
    def decorator(func):
        def make_request_and_skip(self):
            try:
                xmlrpclib.ServerProxy(URL).my_function()
            except (xmlrpclib.ProtocolError, socket.error) as e:
                if not is_unavailable_exception(e):
                    raise
            raise unittest.SkipTest(reason)
        return make_request_and_skip
    return decorator
test_xmlrpc.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_simple1(self):
        try:
            p = xmlrpclib.ServerProxy(URL)
            self.assertEqual(p.pow(6,8), 6**8)
        except (xmlrpclib.ProtocolError, socket.error) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
test_xmlrpc.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_nonascii(self):
        start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
        end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
        try:
            p = xmlrpclib.ServerProxy(URL)
            self.assertEqual(p.add(start_string, end_string),
                             start_string + end_string)
        except (xmlrpclib.ProtocolError, socket.error) as e:
            # ignore failures due to non-blocking socket 'unavailable' errors
            if not is_unavailable_exception(e):
                # protocol error; provide additional information in test output
                self.fail("%s\n%s" % (e, getattr(e, "headers", "")))

    # [ch] The test 404 is causing lots of false alarms.


问题


面经


文章

微信
公众号

扫码关注公众号