def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
python类readBody()的实例源码
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def test_key_not_found_is_raised_if_key_search_responds_404(self):
"""
Test if key search request comes back with a 404 response then
KeyNotFound is raised, with corresponding error message.
"""
km = self._key_manager(url=NICKSERVER_URI)
client.readBody = mock.Mock(return_value=defer.succeed(None))
km._nicknym._async_client_pinned.request = mock.Mock(
return_value=defer.succeed(None))
url = NICKSERVER_URI + '?address=' + INVALID_MAIL_ADDRESS
d = km._nicknym._fetch_and_handle_404_from_nicknym(url)
def check_key_not_found_is_raised_if_404(_):
used_kwargs = km._nicknym._async_client_pinned.request.call_args[1]
check_404_callback = used_kwargs['callback']
fake_response = mock.Mock()
fake_response.code = NOT_FOUND
with self.assertRaisesRegexp(errors.KeyNotFound,
'404: Key not found. Request: '
'%s' % url.replace('?', '\?')):
check_404_callback(fake_response)
d.addCallback(check_key_not_found_is_raised_if_404)
return d
def _fetch_key_with_address(self, km, address, key):
"""
:returns: a Deferred that will fire with the OpenPGPKey
"""
data = json.dumps({'address': address, 'openpgp': key})
client.readBody = mock.Mock(return_value=defer.succeed(data))
# mock the fetcher so it returns the key for ADDRESS_2
km._nicknym._async_client_pinned.request = mock.Mock(
return_value=defer.succeed(None))
km.ca_cert_path = 'cacertpath'
# try to key get without fetching from server
d_fail = km.get_key(address, fetch_remote=False)
d = self.assertFailure(d_fail, errors.KeyNotFound)
# try to get key fetching from server.
d.addCallback(lambda _: km.get_key(address))
return d
def checkResponse(self, response, exp_result):
# TODO: convert to log messages
"""
print 'Response version:', response.version
print 'Response code:', response.code
print 'Response phrase:', response.phrase
print 'Response headers:'
print pformat(list(response.headers.getAllRawHeaders()))
"""
"""
LOG.debug("Response Body %s", str(response.version))
LOG.debug("Response Body %s", str(response.code))
LOG.debug("Response Body %s", str(response.phrase))
LOG.debug("Response Body %s",
str(list(response.headers.getAllRawHeaders())))
LOG.debug("Expected Results %s", str(exp_result))
"""
d = readBody(response)
d.addCallback(self.assertResponse, exp_result)
return d
def getResponse(self, response):
# TODO: convert to log messages
"""
print 'Response version:', response.version
print 'Response code:', response.code
print 'Response phrase:', response.phrase
print 'Response headers:'
print pformat(list(response.headers.getAllRawHeaders()))
"""
"""
LOG.debug("Response Body %s", str(response.version))
LOG.debug("Response Body %s", str(response.code))
LOG.debug("Response Body %s", str(response.phrase))
LOG.debug("Response Body %s",
str(list(response.headers.getAllRawHeaders())))
LOG.debug("Expected Results %s", str(exp_result))
"""
d = readBody(response)
return d
def http_request(self, path="/", encode='zlib'):
"""
Do an http request on the broker
:type path str
"""
url = "http://localhost:" + str(Broker.get_instance().http_port)+path
print url
# http"://localhost:broker.http_port
request = self._http_agent.request(
'GET',
url,
Headers({'User-Agent': ['Twisted Web Client']}),
None)
request.addCallback(lambda response: readBody(response))
request.addCallback(lambda html: (base64.b64encode(html.encode(encode))) if encode == "zlib" else html)
return request
twisted_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def fetch(path, url, state):
agent = OnionRoutedAgent(reactor, path=path, state=state)
request = agent.request("GET", url)
reactor.callLater(10, request.cancel)
request.addCallback(readBody)
def parse_ip(body):
exit_ip = path[-1].ip
try:
checked_ip = re.search("<strong>(.*)</strong>", body).group(1)
return exit_ip, checked_ip
except AttributeError:
return exit_ip, None
request.addCallback(parse_ip)
def err(failure):
failure.trap(defer.CancelledError, ResponseNeverReceived,
ResponseFailed, HostUnreachable, TTLExpired)
log.err(failure)
request.addErrback(err)
return request
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def test_withPotentialDataLoss(self):
"""
If the full body of the L{IResponse} passed to L{client.readBody} is
not definitely received, the L{Deferred} returned by L{client.readBody}
fires with a L{Failure} wrapping L{client.PartialDownloadError} with
the content that was received.
"""
response = DummyResponse()
d = client.readBody(response)
response.protocol.dataReceived(b"first")
response.protocol.dataReceived(b"second")
response.protocol.connectionLost(Failure(PotentialDataLoss()))
failure = self.failureResultOf(d)
failure.trap(client.PartialDownloadError)
self.assertEqual({
"status": failure.value.status,
"message": failure.value.message,
"body": failure.value.response,
}, {
"status": b"200",
"message": b"OK",
"body": b"firstsecond",
})
def test_deprecatedTransport(self):
"""
Calling L{client.readBody} with a transport that does not implement
L{twisted.internet.interfaces.ITCPTransport} produces a deprecation
warning, but no exception when cancelling.
"""
response = DummyResponse(transportFactory=StringTransport)
response.transport.abortConnection = None
d = self.assertWarns(
DeprecationWarning,
'Using readBody with a transport that does not have an '
'abortConnection method',
__file__,
lambda: client.readBody(response))
d.cancel()
self.failureResultOf(d, defer.CancelledError)
def test_noProxyPassthrough(self):
"""
The CGI script is never called with the Proxy header passed through.
"""
cgiFilename = self.writeCGI(HEADER_OUTPUT_CGI)
portnum = self.startServer(cgiFilename)
url = "http://localhost:%d/cgi" % (portnum,)
agent = client.Agent(reactor)
headers = http_headers.Headers({"Proxy": ["foo"],
"X-Innocent-Header": ["bar"]})
d = agent.request(b"GET", url, headers=headers)
def checkResponse(response):
headers = json.loads(response)
self.assertEqual(
set(headers.keys()),
{"HTTP_HOST", "HTTP_CONNECTION", "HTTP_X_INNOCENT_HEADER"})
d.addCallback(client.readBody)
d.addCallback(checkResponse)
return d
def testReadInput(self):
cgiFilename = os.path.abspath(self.mktemp())
with open(cgiFilename, 'wt') as cgiFile:
cgiFile.write(READINPUT_CGI)
portnum = self.startServer(cgiFilename)
agent = client.Agent(reactor)
d = agent.request(
uri="http://localhost:%d/cgi" % (portnum,),
method=b"POST",
bodyProducer=client.FileBodyProducer(
BytesIO(b"Here is your stdin")),
)
d.addCallback(client.readBody)
d.addCallback(self._testReadInput_1)
return d
def testDistrib(self):
# site1 is the publisher
r1 = resource.Resource()
r1.putChild("there", static.Data("root", "text/plain"))
site1 = server.Site(r1)
self.f1 = PBServerFactory(distrib.ResourcePublisher(site1))
self.port1 = reactor.listenTCP(0, self.f1)
self.sub = distrib.ResourceSubscription("127.0.0.1",
self.port1.getHost().port)
r2 = resource.Resource()
r2.putChild("here", self.sub)
f2 = MySite(r2)
self.port2 = reactor.listenTCP(0, f2)
agent = client.Agent(reactor)
d = agent.request(b"GET", "http://127.0.0.1:%d/here/there" % \
(self.port2.getHost().port,))
d.addCallback(client.readBody)
d.addCallback(self.assertEqual, 'root')
return d
def _requestTest(self, child, **kwargs):
"""
Set up a resource on a distrib site using L{ResourcePublisher} and
then retrieve it from a L{ResourceSubscription} via an HTTP client.
@param child: The resource to publish using distrib.
@param **kwargs: Extra keyword arguments to pass to L{Agent.request} when
requesting the resource.
@return: A L{Deferred} which fires with the result of the request.
"""
mainPort, mainAddr = self._setupDistribServer(child)
agent = client.Agent(reactor)
url = "http://%s:%s/child" % (mainAddr.host, mainAddr.port)
d = agent.request(b"GET", url, **kwargs)
d.addCallback(client.readBody)
return d
def version(self):
"""
Issue a I{GET} for the Kubernetes server version.
"""
action = start_action(
action_type=u"network-client:version",
)
with action.context():
url = self.kubernetes.base_url.child(u"version")
d = DeferredContext(self._get(url))
d.addCallback(check_status, (OK,), self.model)
d.addCallback(readBody)
d.addCallback(loads)
d.addCallback(log_response_object, action)
d.addCallback(self.model.version_type.create)
return d.addActionFinish()
def create(self, obj):
"""
Issue a I{POST} to create the given object.
"""
action = start_action(
action_type=u"network-client:create",
)
with action.context():
url = self.kubernetes.base_url.child(*collection_location(obj))
document = self.model.iobject_to_raw(obj)
Message.log(submitted_object=document)
d = DeferredContext(self._post(url, document))
d.addCallback(check_status, (CREATED,), self.model)
d.addCallback(readBody)
d.addCallback(loads)
d.addCallback(log_response_object, action)
d.addCallback(self.model.iobject_from_raw)
return d.addActionFinish()
def replace(self, obj):
"""
Issue a I{PUT} to replace an existing object with a new one.
"""
action = start_action(
action_type=u"network-client:replace",
)
with action.context():
url = self.kubernetes.base_url.child(*object_location(obj))
document = self.model.iobject_to_raw(obj)
Message.log(submitted_object=document)
d = DeferredContext(self._put(url, document))
d.addCallback(check_status, (OK,), self.model)
d.addCallback(readBody)
d.addCallback(loads)
d.addCallback(log_response_object, action)
d.addCallback(self.model.iobject_from_raw)
return d.addActionFinish()
def get(self, obj):
"""
Issue a I{GET} to retrieve the given object.
The object must have identifying metadata such as a namespace and a
name but other fields are ignored.
"""
action = start_action(
action_type=u"network-client:get",
kind=obj.kind,
name=obj.metadata.name,
namespace=getattr(obj.metadata, "namespace", None),
)
with action.context():
url = self.kubernetes.base_url.child(*object_location(obj))
d = DeferredContext(self._get(url))
d.addCallback(check_status, (OK,), self.model)
d.addCallback(readBody)
d.addCallback(loads)
d.addCallback(log_response_object, action)
d.addCallback(self.model.iobject_from_raw)
return d.addActionFinish()
def list(self, kind):
"""
Issue a I{GET} to retrieve objects of a given kind.
"""
action = start_action(
action_type=u"network-client:list",
kind=kind.kind,
apiVersion=kind.apiVersion,
)
with action.context():
url = self.kubernetes.base_url.child(*collection_location(kind))
d = DeferredContext(self._get(url))
d.addCallback(check_status, (OK,), self.model)
d.addCallback(readBody)
d.addCallback(
lambda body: self.model.iobject_from_raw(loads(body)),
)
return d.addActionFinish()
def from_model_and_response(cls, model, response):
"""
Create a ``KubernetesError`` for the given error response from a
Kubernetes server.
:param model: The Kubernetes data model to use to convert the server
response into a Python object.
:param twisted.web.iweb.IResponse response: The response to inspect
for the error details.
:return Deferred(KubernetesError): The error with details attached.
"""
d = readBody(response)
d.addCallback(
lambda body: cls(
response.code,
model.iobject_from_raw(loads(body)),
),
)
return d
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def getPage(self, url):
resp = yield self.http_agent.request(b'GET', url.encode())
body = yield web_client.readBody(resp)
returnValue(body.decode())
# before-end calls
def _fetch_key_with_fingerprint(self, km, fingerprint, key):
"""
:returns: a Deferred that will fire with the OpenPGPKey
"""
data = json.dumps({'fingerprint': fingerprint, 'openpgp': key})
client.readBody = mock.Mock(return_value=defer.succeed(data))
# mock the fetcher so it returns the key for KEY_FINGERPRINT
km._nicknym._async_client_pinned.request = mock.Mock(
return_value=defer.succeed(None))
km.ca_cert_path = 'cacertpath'
key = km._nicknym.fetch_key_with_fingerprint(fingerprint)
return key
def _fetch_and_handle_404_from_nicknym(self, uri):
"""
Send a GET request to C{uri} containing C{data}.
:param uri: The URI of the request.
:type uri: str
:return: A deferred that will be fired with GET content as json (dict)
:rtype: Deferred
"""
def check_code(response):
if response.code == NOT_FOUND:
message = ' %s: Key not found. Request: %s' \
% (response.code, uri)
self.log.warn(message)
raise KeyNotFound(message), None, sys.exc_info()[2]
if response.code == SERVICE_UNAVAILABLE:
message = ' %s: Service unavailable (maybe in maintenance).' \
'Request: %s' % (response.code, uri)
self.log.warn(message)
raise KeyNotFound(message), None, sys.exc_info()[2]
if response.code == BAD_GATEWAY:
message = ' %s: Bad gateway. Request: %s. Response: %s' \
% (response.code, uri, response)
self.log.warn(message)
raise KeyNotFound(message), None, sys.exc_info()[2]
return response
d = self._async_client_pinned.request(str(uri), 'GET',
callback=check_code)
d.addCallback(client.readBody)
return d
def _handle_request(self, response):
self.responding = True
d = readBody(response)
d.addCallback(self._handle_body)
return d