def test_get_block(self):
factory = p2p.ClientFactory(networks.nets['bitcoin'])
c = reactor.connectTCP('127.0.0.1', 8333, factory)
try:
h = 0x000000000000046acff93b0e76cd10490551bf871ce9ac9fad62e67a07ff1d1e
block = yield deferral.retry()(defer.inlineCallbacks(lambda: defer.returnValue((yield (yield factory.getProtocol()).get_block(h)))))()
assert data.merkle_hash(map(data.hash256, map(data.tx_type.pack, block['txs']))) == block['header']['merkle_root']
assert data.hash256(data.block_header_type.pack(block['header'])) == h
finally:
factory.stopTrying()
c.disconnect()
python类inlineCallbacks()的实例源码
def retry(message='Error:', delay=3, max_retries=None, traceback=True):
'''
@retry('Error getting block:', 1)
@defer.inlineCallbacks
def get_block(hash):
...
'''
def retry2(func):
@defer.inlineCallbacks
def f(*args, **kwargs):
for i in itertools.count():
try:
result = yield func(*args, **kwargs)
except Exception, e:
if i == max_retries:
raise
if not isinstance(e, RetrySilentlyException):
if traceback:
log.err(None, message)
else:
print >>sys.stderr, message, e
yield sleep(delay)
else:
defer.returnValue(result)
return f
return retry2
def test_success(self):
@inlineCallbacks
def fn():
if False:
# inlineCallbacks doesn't work with regular functions;
# must have a yield even if it's unreachable.
yield
returnValue(42)
f = gen.convert_yielded(fn())
self.assertEqual(f.result(), 42)
def test_failure(self):
@inlineCallbacks
def fn():
if False:
yield
1 / 0
f = gen.convert_yielded(fn())
with self.assertRaises(ZeroDivisionError):
f.result()
def test_success(self):
@inlineCallbacks
def fn():
if False:
# inlineCallbacks doesn't work with regular functions;
# must have a yield even if it's unreachable.
yield
returnValue(42)
f = gen.convert_yielded(fn())
self.assertEqual(f.result(), 42)
def test_failure(self):
@inlineCallbacks
def fn():
if False:
yield
1 / 0
f = gen.convert_yielded(fn())
with self.assertRaises(ZeroDivisionError):
f.result()
def connect_factory(host, port, factory, blob_storage, hash_to_process):
from twisted.internet import reactor
@defer.inlineCallbacks
def on_finish(result):
log.info("Finished sending %s", hash_to_process)
yield update_sent_blobs(factory.p.blob_hashes_sent, host, blob_storage)
connection.disconnect()
reactor.fireSystemEvent("shutdown")
@defer.inlineCallbacks
def on_error(error):
log.error("Error when sending %s: %s. Hashes sent %s", hash_to_process, error,
factory.p.blob_hashes_sent)
yield update_sent_blobs(factory.p.blob_hashes_sent, host, blob_storage)
connection.disconnect()
reactor.fireSystemEvent("shutdown")
def on_connection_fail(result):
log.error("Failed to connect to %s:%s", host, port)
reactor.fireSystemEvent("shutdown")
def _error(failure):
log.error("Failed on_connection_lost_d callback: %s", failure)
reactor.fireSystemEvent("shutdown")
factory.on_connection_lost_d.addCallbacks(on_finish, on_error)
factory.on_connection_lost_d.addErrback(_error)
factory.on_connection_fail_d.addCallback(on_connection_fail)
try:
log.debug("Connecting factory to %s:%s", host, port)
connection = reactor.connectTCP(host, port, factory, timeout=TCP_CONNECT_TIMEOUT)
except JobTimeoutException:
log.error("Failed to forward %s --> %s", hash_to_process[:8], host)
return sys.exit(0)
except Exception as err:
log.exception("Job (pid %s) encountered unexpected error")
return sys.exit(1)
def twisted_wrapper(arg):
"""
Wrap a twisted test. Optionally supply a test timeout.
Note that arg might either be a func or the timeout.
"""
if isinstance(arg, (int, long)):
return lambda x: deferred(arg)(inlineCallbacks(x))
return deferred(timeout=1)(inlineCallbacks(arg))
def subscribe(self, uri=None):
"""
Decorator attaching a function as an event handler.
The first argument of the decorator should be the URI of the topic
to subscribe to. If no URI is given, the URI is constructed from
the application URI prefix and the Python function name.
If the function yield, it will be assumed that it's an asynchronous
process and inlineCallbacks will be applied to it.
:Example:
.. code-block:: python
@app.subscribe('com.myapp.topic1')
def onevent1(x, y):
print("got event on topic1", x, y)
:param uri: The URI of the topic to subscribe to.
:type uri: unicode
"""
def decorator(func):
if uri:
_uri = uri
else:
assert(self._prefix is not None)
_uri = "{0}.{1}".format(self._prefix, func.__name__)
if inspect.isgeneratorfunction(func):
func = inlineCallbacks(func)
self._handlers.append((_uri, func))
return func
return decorator
def signal(self, name):
"""
Decorator attaching a function as handler for application signals.
Signals are local events triggered internally and exposed to the
developer to be able to react to the application lifecycle.
If the function yield, it will be assumed that it's an asynchronous
coroutine and inlineCallbacks will be applied to it.
Current signals :
- `onjoined`: Triggered after the application session has joined the
realm on the router and registered/subscribed all procedures
and event handlers that were setup via decorators.
- `onleave`: Triggered when the application session leaves the realm.
.. code-block:: python
@app.signal('onjoined')
def _():
# do after the app has join a realm
:param name: The name of the signal to watch.
:type name: unicode
"""
def decorator(func):
if inspect.isgeneratorfunction(func):
func = inlineCallbacks(func)
self._signals.setdefault(name, []).append(func)
return func
return decorator
def __init__(self):
self.qrt = qtm.QRT("127.0.0.1", 22223)
self.qrt.connect(on_connect=self.on_connect, on_disconnect=self.on_disconnect, on_event=self.on_event)
self.init = False
self.connection = None
# Inline callbacks is a feature of the twisted framework that makes it possible to write
# asynchronous code that looks synchronous
# http://twistedmatrix.com/documents/current/api/twisted.internet.defer.inlineCallbacks.html
def test_success(self):
@inlineCallbacks
def fn():
if False:
# inlineCallbacks doesn't work with regular functions;
# must have a yield even if it's unreachable.
yield
returnValue(42)
f = gen.convert_yielded(fn())
self.assertEqual(f.result(), 42)
def test_failure(self):
@inlineCallbacks
def fn():
if False:
yield
1 / 0
f = gen.convert_yielded(fn())
with self.assertRaises(ZeroDivisionError):
f.result()
def mistakenMethod(self):
"""
This method mistakenly invokes L{returnValue}, despite the fact that it
is not decorated with L{inlineCallbacks}.
"""
returnValue(1)
def test_returnValueNonLocalWarning(self):
"""
L{returnValue} will emit a non-local exit warning in the simplest case,
where the offending function is invoked immediately.
"""
@inlineCallbacks
def inline():
self.mistakenMethod()
returnValue(2)
yield 0
d = inline()
results = []
d.addCallback(results.append)
self.assertMistakenMethodWarning(results)
def testReturnNoValue(self):
"""Ensure a standard python return results in a None result."""
def _noReturn():
yield 5
return
_noReturn = inlineCallbacks(_noReturn)
return _noReturn().addCallback(self.assertEqual, None)
def testReturnValue(self):
"""Ensure that returnValue works."""
def _return():
yield 5
returnValue(6)
_return = inlineCallbacks(_return)
return _return().addCallback(self.assertEqual, 6)
def test_nonGeneratorReturn(self):
"""
Ensure that C{TypeError} with a message about L{inlineCallbacks} is
raised when a non-generator returns something other than a generator.
"""
def _noYield():
return 5
_noYield = inlineCallbacks(_noYield)
self.assertIn("inlineCallbacks",
str(self.assertRaises(TypeError, _noYield)))
def test_nonGeneratorReturnValue(self):
"""
Ensure that C{TypeError} with a message about L{inlineCallbacks} is
raised when a non-generator calls L{returnValue}.
"""
def _noYield():
returnValue(5)
_noYield = inlineCallbacks(_noYield)
self.assertIn("inlineCallbacks",
str(self.assertRaises(TypeError, _noYield)))
def test_passInlineCallbacks(self):
"""
The body of a L{defer.inlineCallbacks} decorated test gets run.
"""
result = self.runTest('test_passInlineCallbacks')
self.assertTrue(result.wasSuccessful())
self.assertEqual(result.testsRun, 1)
self.assertTrue(detests.DeferredTests.touched)