def successResultOf(self, deferred: Deferred):
"""
Override
:meth:`twisted.trial.unittest.SynchronousTestCase.successResultOf` to
enable handling of coroutines as well as
:class:`twisted.internet.defer.Deferred` s.
"""
deferred = ensureDeferred(deferred)
return unittest.TestCase.successResultOf(self, deferred)
# def failureResultOf(self, deferred: Deferred, *expectedExceptionTypes):
# """
# Override
# :meth:`twisted.trial.unittest.SynchronousTestCase.failureResultOf` to
# enable handling of coroutines as well as
# class:`twisted.internet.defer.Deferred` s.
# """
# deferred = ensureDeferred(deferred)
# return unittest.TestCase.failureResultOf(self, deferred)
python类ensureDeferred()的实例源码
def test__request(self):
request_headers = sansio.create_headers("gidgethub")
gh = gh_treq.GitHubAPI("gidgethub")
d = ensureDeferred(
gh._request(
"GET", "https://api.github.com/rate_limit", request_headers,
)
)
def test_done(response):
data, rate_limit, _ = sansio.decipher_response(*response)
self.assertIn("rate", data)
d.addCallback(test_done)
d.addCallback(self.create_cleanup(gh))
return d
def render_GET(self, request):
# type: (Request) -> int
"""
Get JSON data from json_GET, and render for the client.
Do not override in sub classes ...
:param request: Twisted request
"""
if iscoroutinefunction(self.json_GET):
coroutine = self.json_GET(request)
json_def = ensureDeferred(coroutine) # type: Deferred
else:
json_def = maybeDeferred(self.json_GET, request)
json_def.addCallback(self.send_json_response, request)
json_def.addErrback(self.handle_failure, request)
# handle connection failures
request.notifyFinish().addErrback(self.on_connection_closed, json_def)
return NOT_DONE_YET
def main(reactor):
client = SlackAPI(token=TOKEN)
d = defer.ensureDeferred(client.query(slack.methods.AUTH_TEST))
d.addCallback(pprint.pprint)
return d
def test_passesThroughDeferreds(self):
"""
L{defer.ensureDeferred} will pass through a Deferred unchanged.
"""
d = defer.Deferred()
d2 = defer.ensureDeferred(d)
self.assertIs(d, d2)
def test_willNotAllowNonDeferredOrCoroutine(self):
"""
Passing L{defer.ensureDeferred} a non-coroutine and a non-Deferred will
raise a L{ValueError}.
"""
with self.assertRaises(ValueError):
defer.ensureDeferred("something")
def create_cleanup(gh):
def cleanup(_):
# We do this just to shut up Twisted.
pool = treq._utils.get_global_pool()
pool.closeCachedConnections()
# We need to sleep to let the connections hang up.
return ensureDeferred(gh.sleep(0.5))
return cleanup
def test_sleep(self):
delay = 1
start = datetime.datetime.now()
gh = gh_treq.GitHubAPI("gidgethub")
def test_done(ignored):
stop = datetime.datetime.now()
self.assertTrue((stop - start) > datetime.timedelta(seconds=delay))
d = ensureDeferred(gh.sleep(delay))
d.addCallback(test_done)
return d
def test_get(self):
gh = gh_treq.GitHubAPI("gidgethub")
d = ensureDeferred(gh.getitem("/rate_limit"))
def test_done(response):
self.assertIn("rate", response)
d.addCallback(test_done)
d.addCallback(self.create_cleanup(gh))
return d
def ensureDeferred(coro):
"""
Schedule the execution of a coroutine that awaits/yields from L{Deferred}s,
wrapping it in a L{Deferred} that will fire on success/failure of the
coroutine. If a Deferred is passed to this function, it will be returned
directly (mimicing C{asyncio}'s C{ensure_future} function).
Coroutine functions return a coroutine object, similar to how generators
work. This function turns that coroutine into a Deferred, meaning that it
can be used in regular Twisted code. For example::
import treq
from twisted.internet.defer import ensureDeferred
from twisted.internet.task import react
async def crawl(pages):
results = {}
for page in pages:
results[page] = await treq.content(await treq.get(page))
return results
def main(reactor):
pages = [
"http://localhost:8080"
]
d = ensureDeferred(crawl(pages))
d.addCallback(print)
return d
react(main)
@param coro: The coroutine object to schedule, or a L{Deferred}.
@type coro: A Python 3.5+ C{async def} C{coroutine}, a Python 3.3+
C{yield from} using L{types.GeneratorType}, or a L{Deferred}.
@rtype: L{Deferred}
"""
from types import GeneratorType
if version_info >= (3, 4, 0):
from asyncio import iscoroutine
if iscoroutine(coro) or isinstance(coro, GeneratorType):
return _inlineCallbacks(None, coro, Deferred())
elif version_info >= (3, 3, 0):
if isinstance(coro, GeneratorType):
return _inlineCallbacks(None, coro, Deferred())
if not isinstance(coro, Deferred):
raise ValueError("%r is not a coroutine or a Deferred" % (coro,))
# Must be a Deferred
return coro