def test_non_existing_multicall(self):
try:
p = xmlrpclib.ServerProxy(URL)
multicall = xmlrpclib.MultiCall(p)
multicall.this_is_not_exists()
result = multicall()
# result.results contains;
# [{'faultCode': 1, 'faultString': '<class \'exceptions.Exception\'>:'
# 'method "this_is_not_exists" is not supported'>}]
self.assertEqual(result.results[0]['faultCode'], 1)
self.assertEqual(result.results[0]['faultString'],
'<class \'Exception\'>:method "this_is_not_exists" '
'is not supported')
except (xmlrpclib.ProtocolError, OSError) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
python类ServerProxy()的实例源码
def test_two(self):
p = xmlrpclib.ServerProxy(URL)
#do three requests.
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
p("close")()
#they should have all been handled by a single request handler
self.assertEqual(len(self.RequestHandler.myRequests), 1)
#check that we did at least two (the third may be pending append
#due to thread scheduling)
self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
#test special attribute access on the serverproxy, through the __call__
#function.
def test_close(self):
p = xmlrpclib.ServerProxy(URL)
#do some requests with close.
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
p("close")() #this should trigger a new keep-alive request
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
self.assertEqual(p.pow(6,8), 6**8)
p("close")()
#they should have all been two request handlers, each having logged at least
#two complete requests
self.assertEqual(len(self.RequestHandler.myRequests), 2)
self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2)
self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2)
def test_fail_with_info(self):
# use the broken message class
xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
# Check that errors in the server send back exception/traceback
# info when flag is set
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
try:
p = xmlrpclib.ServerProxy(URL)
p.pow(6,8)
except (xmlrpclib.ProtocolError, OSError) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e) and hasattr(e, "headers"):
# We should get error info in the response
expected_err = "invalid literal for int() with base 10: 'I am broken'"
self.assertEqual(e.headers.get("X-exception"), expected_err)
self.assertTrue(e.headers.get("X-traceback") is not None)
else:
self.fail('ProtocolError not raised')
def __init__(self, username, password, url, use_mod_auth_kerb=False):
if url.startswith('https://'):
self._transport = SafeCookieTransport()
elif url.startswith('http://'):
self._transport = CookieTransport()
else:
raise TCMSError("Unrecognized URL scheme")
self._transport.cookiejar = CookieJar()
# print("COOKIES:", self._transport.cookiejar._cookies)
self.server = xmlrpclib.ServerProxy(
url,
transport=self._transport,
verbose=VERBOSE,
allow_none=1
)
# Login, get a cookie into our cookie jar (login_dict):
self.server.Auth.login(username, password)
# Record the user ID in case the script wants this
# self.user_id = login_dict['id']
# print('Logged in with cookie for user %i' % self.userId)
# print("COOKIES:", self._transport.cookiejar._cookies)
def __init__(self, url):
if url.startswith('https://'):
self._transport = KerbTransport()
elif url.startswith('http://'):
raise TCMSError("Encrypted https communication required for "
"Kerberos authentication.\nURL provided: {0}".format(url))
else:
raise TCMSError("Unrecognized URL scheme: {0}".format(url))
self._transport.cookiejar = CookieJar()
# print("COOKIES:", self._transport.cookiejar._cookies)
self.server = xmlrpclib.ServerProxy(
url,
transport=self._transport,
verbose=VERBOSE,
allow_none=1
)
# Login, get a cookie into our cookie jar (login_dict):
self.server.Auth.login_krbv()
python_popular_analyses.py 文件源码
项目:fabric8-analytics-jobs
作者: fabric8-analytics
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def _use_pypi_xml_rpc(self):
"""Schedule analyses of packages based on PyPI index using XML-RPC.
https://wiki.python.org/moin/PyPIXmlRpc
"""
client = xmlrpclib.ServerProxy('https://pypi.python.org/pypi')
# get a list of package names
packages = sorted(client.list_packages())
for idx, package in enumerate(packages[self.count.min:self.count.max]):
releases = client.package_releases(package, True) # True for show_hidden arg
self.log.debug("Scheduling #%d. (number versions: %d)",
self.count.min + idx, self.nversions)
for version in releases[:self.nversions]:
self.analyses_selinon_flow(package, version)
def __init__(self, uri, transport=None, encoding=None, verbose=0,
allow_none=0):
xmlrpcclient.ServerProxy.__init__(self, uri, transport=transport,
encoding=encoding, verbose=verbose,
allow_none=allow_none)
self.transport = transport
self._session = None
self.last_login_method = None
self.last_login_params = None
self.API_version = API_VERSION_1_1
def __getattr__(self, name):
if name == 'handle':
return self._session
elif name == 'xenapi':
return _Dispatcher(self.API_version, self.xenapi_request, None)
elif name.startswith('login') or name.startswith('slave_local'):
return lambda *params: self._login(name, params)
elif name == 'logout':
return _Dispatcher(self.API_version, self.xenapi_request, "logout")
else:
return xmlrpcclient.ServerProxy.__getattr__(self, name)
def search_package(name):
"""search package.
:param str name: package name
:rtype: list
:return: package name list
"""
client = xmlrpc_client.ServerProxy(PYPI_URL)
return [pkg for pkg in client.search({'name': name})
if pkg.get('name') == name]
def __init__(self, nodes):
for n in nodes:
self.nodes_rpc[str(n["id"])] = Server("http://{}:{}".format(n["address"], str(n["rpcPort"])))
for n in nodes:
self.nodes_rpc[str(n["id"])].clean_all_qdisc()
self.nodes_rpc[str(n["id"])].create_root_qdisc()
for n in nodes:
if not self.nodes_rpc[str(n["id"])].init_qdisc():
raise Exception("[{}] Error initializing qdiscs".format(NETEM_ERROR))
# modify connection from source to target using netem_command
def create_tasks(session, max_pkgs=MAX_PKGS):
client = ServerProxy(PYPI_URL)
return [get_package_info(session, pkg_name, downloads)
for pkg_name, downloads in client.top_packages(max_pkgs)]
def create_tasks(session, max_pkgs=MAX_PKGS):
client = ServerProxy(PYPI_URL)
return [get_package_info(session, pkg_name, downloads)
for pkg_name, downloads in client.top_packages(max_pkgs)]
def get_pkg_info(pkg_name, downloads=0):
# multiple asyncio jobs can not share a client
client = ServerProxy(PYPI_URL)
try:
release = client.package_releases(pkg_name)[0]
except IndexError: # marionette-transport, ll-orasql, and similar
print(pkg_name, 'has no releases in PyPI!!')
return pkg_info(pkg_name, downloads, False, False, 'PyPI error!!', '')
troves = '\n'.join(client.release_data(pkg_name, release)['classifiers'])
py2only = py2_only_classifier in troves
py3 = py3_classifier in troves
url = client.release_data(pkg_name, release)['package_url']
return pkg_info(pkg_name, downloads, py2only, py3, release, url)
def async_main(max_pkgs=MAX_PKGS): # ~ 32 secs for 200 pkgs on my MacBookPro
loop = asyncio.get_event_loop()
client = ServerProxy(PYPI_URL)
futures = [loop.run_in_executor(None, get_pkg_info, pkg_name, downloads)
for pkg_name, downloads in client.top_packages(max_pkgs)]
return [(yield from fut) for fut in futures]
def VimRpc(address=None):
if address is None:
address = addr
if address is None:
print('ERROR No Valid ADDRESS')
return -1
_serv_add = 'http://%s:%s' % (address)
lorris = ServerProxy(_serv_add, allow_none=True)
return lorris
def test_ssl_presence(self):
try:
import ssl
except ImportError:
has_ssl = False
else:
has_ssl = True
try:
xmlrpc.client.ServerProxy('https://localhost:9999').bad_function()
except NotImplementedError:
self.assertFalse(has_ssl, "xmlrpc client's error with SSL support")
except socket.error:
self.assertTrue(has_ssl)
def make_request_and_skipIf(condition, reason):
# If we skip the test, we have to make a request because the
# the server created in setUp blocks expecting one to come in.
if not condition:
return lambda func: func
def decorator(func):
def make_request_and_skip(self):
try:
xmlrpclib.ServerProxy(URL).my_function()
except (xmlrpclib.ProtocolError, socket.error) as e:
if not is_unavailable_exception(e):
raise
raise unittest.SkipTest(reason)
return make_request_and_skip
return decorator
def test_simple1(self):
try:
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p.pow(6,8), 6**8)
except (xmlrpclib.ProtocolError, socket.error) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
def test_nonascii(self):
start_string = 'P\N{LATIN SMALL LETTER Y WITH CIRCUMFLEX}t'
end_string = 'h\N{LATIN SMALL LETTER O WITH HORN}n'
try:
p = xmlrpclib.ServerProxy(URL)
self.assertEqual(p.add(start_string, end_string),
start_string + end_string)
except (xmlrpclib.ProtocolError, socket.error) as e:
# ignore failures due to non-blocking socket 'unavailable' errors
if not is_unavailable_exception(e):
# protocol error; provide additional information in test output
self.fail("%s\n%s" % (e, getattr(e, "headers", "")))
# [ch] The test 404 is causing lots of false alarms.