python类inlineCallbacks()的实例源码

test_p2p.py 文件源码 项目:p2pool-cann 作者: ilsawa 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_get_block(self):
        factory = p2p.ClientFactory(networks.nets['bitcoin'])
        c = reactor.connectTCP('127.0.0.1', 8333, factory)
        try:
            h = 0x000000000000046acff93b0e76cd10490551bf871ce9ac9fad62e67a07ff1d1e
            block = yield deferral.retry()(defer.inlineCallbacks(lambda: defer.returnValue((yield (yield factory.getProtocol()).get_block(h)))))()
            assert data.merkle_hash(map(data.hash256, map(data.tx_type.pack, block['txs']))) == block['header']['merkle_root']
            assert data.hash256(data.block_header_type.pack(block['header'])) == h
        finally:
            factory.stopTrying()
            c.disconnect()
deferral.py 文件源码 项目:p2pool-cann 作者: ilsawa 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def retry(message='Error:', delay=3, max_retries=None, traceback=True):
    '''
    @retry('Error getting block:', 1)
    @defer.inlineCallbacks
    def get_block(hash):
        ...
    '''

    def retry2(func):
        @defer.inlineCallbacks
        def f(*args, **kwargs):
            for i in itertools.count():
                try:
                    result = yield func(*args, **kwargs)
                except Exception, e:
                    if i == max_retries:
                        raise
                    if not isinstance(e, RetrySilentlyException):
                        if traceback:
                            log.err(None, message)
                        else:
                            print >>sys.stderr, message, e
                    yield sleep(delay)
                else:
                    defer.returnValue(result)
        return f
    return retry2
twisted_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_success(self):
        @inlineCallbacks
        def fn():
            if False:
                # inlineCallbacks doesn't work with regular functions;
                # must have a yield even if it's unreachable.
                yield
            returnValue(42)
        f = gen.convert_yielded(fn())
        self.assertEqual(f.result(), 42)
twisted_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_failure(self):
        @inlineCallbacks
        def fn():
            if False:
                yield
            1 / 0
        f = gen.convert_yielded(fn())
        with self.assertRaises(ZeroDivisionError):
            f.result()
twisted_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_success(self):
        @inlineCallbacks
        def fn():
            if False:
                # inlineCallbacks doesn't work with regular functions;
                # must have a yield even if it's unreachable.
                yield
            returnValue(42)
        f = gen.convert_yielded(fn())
        self.assertEqual(f.result(), 42)
twisted_test.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_failure(self):
        @inlineCallbacks
        def fn():
            if False:
                yield
            1 / 0
        f = gen.convert_yielded(fn())
        with self.assertRaises(ZeroDivisionError):
            f.result()
task.py 文件源码 项目:reflector-cluster 作者: lbryio 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def connect_factory(host, port, factory, blob_storage, hash_to_process):
    from twisted.internet import reactor
    @defer.inlineCallbacks
    def on_finish(result):
        log.info("Finished sending %s", hash_to_process)
        yield update_sent_blobs(factory.p.blob_hashes_sent, host, blob_storage)
        connection.disconnect()
        reactor.fireSystemEvent("shutdown")

    @defer.inlineCallbacks
    def on_error(error):
        log.error("Error when sending %s: %s. Hashes sent %s", hash_to_process, error,
                                                            factory.p.blob_hashes_sent)
        yield update_sent_blobs(factory.p.blob_hashes_sent, host, blob_storage)
        connection.disconnect()
        reactor.fireSystemEvent("shutdown")

    def on_connection_fail(result):
        log.error("Failed to connect to %s:%s", host, port)
        reactor.fireSystemEvent("shutdown")

    def _error(failure):
        log.error("Failed on_connection_lost_d callback: %s", failure)
        reactor.fireSystemEvent("shutdown")

    factory.on_connection_lost_d.addCallbacks(on_finish, on_error)
    factory.on_connection_lost_d.addErrback(_error)

    factory.on_connection_fail_d.addCallback(on_connection_fail)
    try:
        log.debug("Connecting factory to %s:%s", host, port)
        connection = reactor.connectTCP(host, port, factory, timeout=TCP_CONNECT_TIMEOUT)
    except JobTimeoutException:
        log.error("Failed to forward %s --> %s", hash_to_process[:8], host)
        return sys.exit(0)
    except Exception as err:
        log.exception("Job (pid %s) encountered unexpected error")
        return sys.exit(1)
util.py 文件源码 项目:py-ipv8 作者: qstokkink 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def twisted_wrapper(arg):
    """
    Wrap a twisted test. Optionally supply a test timeout.

    Note that arg might either be a func or the timeout.
    """
    if isinstance(arg, (int, long)):
        return lambda x: deferred(arg)(inlineCallbacks(x))
    return deferred(timeout=1)(inlineCallbacks(arg))
wamp.py 文件源码 项目:deb-python-autobahn 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def subscribe(self, uri=None):
        """
        Decorator attaching a function as an event handler.

        The first argument of the decorator should be the URI of the topic
        to subscribe to. If no URI is given, the URI is constructed from
        the application URI prefix and the Python function name.

        If the function yield, it will be assumed that it's an asynchronous
        process and inlineCallbacks will be applied to it.

        :Example:

        .. code-block:: python

           @app.subscribe('com.myapp.topic1')
           def onevent1(x, y):
              print("got event on topic1", x, y)

        :param uri: The URI of the topic to subscribe to.
        :type uri: unicode
        """
        def decorator(func):
            if uri:
                _uri = uri
            else:
                assert(self._prefix is not None)
                _uri = "{0}.{1}".format(self._prefix, func.__name__)

            if inspect.isgeneratorfunction(func):
                func = inlineCallbacks(func)

            self._handlers.append((_uri, func))
            return func
        return decorator
wamp.py 文件源码 项目:deb-python-autobahn 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def signal(self, name):
        """
        Decorator attaching a function as handler for application signals.

        Signals are local events triggered internally and exposed to the
        developer to be able to react to the application lifecycle.

        If the function yield, it will be assumed that it's an asynchronous
        coroutine and inlineCallbacks will be applied to it.

        Current signals :

           - `onjoined`: Triggered after the application session has joined the
              realm on the router and registered/subscribed all procedures
              and event handlers that were setup via decorators.
           - `onleave`: Triggered when the application session leaves the realm.

        .. code-block:: python

           @app.signal('onjoined')
           def _():
              # do after the app has join a realm

        :param name: The name of the signal to watch.
        :type name: unicode
        """
        def decorator(func):
            if inspect.isgeneratorfunction(func):
                func = inlineCallbacks(func)
            self._signals.setdefault(name, []).append(func)
            return func
        return decorator
advanced_example.py 文件源码 项目:qualisys_python_sdk 作者: qualisys 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self):
        self.qrt = qtm.QRT("127.0.0.1", 22223)
        self.qrt.connect(on_connect=self.on_connect, on_disconnect=self.on_disconnect, on_event=self.on_event)
        self.init = False
        self.connection = None

    # Inline callbacks is a feature of the twisted framework that makes it possible to write
    # asynchronous code that looks synchronous
    # http://twistedmatrix.com/documents/current/api/twisted.internet.defer.inlineCallbacks.html
twisted_test.py 文件源码 项目:projects-2017-2 作者: ncss 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_success(self):
        @inlineCallbacks
        def fn():
            if False:
                # inlineCallbacks doesn't work with regular functions;
                # must have a yield even if it's unreachable.
                yield
            returnValue(42)
        f = gen.convert_yielded(fn())
        self.assertEqual(f.result(), 42)
twisted_test.py 文件源码 项目:projects-2017-2 作者: ncss 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_failure(self):
        @inlineCallbacks
        def fn():
            if False:
                yield
            1 / 0
        f = gen.convert_yielded(fn())
        with self.assertRaises(ZeroDivisionError):
            f.result()
test_inlinecb.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def mistakenMethod(self):
        """
        This method mistakenly invokes L{returnValue}, despite the fact that it
        is not decorated with L{inlineCallbacks}.
        """
        returnValue(1)
test_inlinecb.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_returnValueNonLocalWarning(self):
        """
        L{returnValue} will emit a non-local exit warning in the simplest case,
        where the offending function is invoked immediately.
        """
        @inlineCallbacks
        def inline():
            self.mistakenMethod()
            returnValue(2)
            yield 0
        d = inline()
        results = []
        d.addCallback(results.append)
        self.assertMistakenMethodWarning(results)
test_defgen.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def testReturnNoValue(self):
        """Ensure a standard python return results in a None result."""
        def _noReturn():
            yield 5
            return
        _noReturn = inlineCallbacks(_noReturn)

        return _noReturn().addCallback(self.assertEqual, None)
test_defgen.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def testReturnValue(self):
        """Ensure that returnValue works."""
        def _return():
            yield 5
            returnValue(6)
        _return = inlineCallbacks(_return)

        return _return().addCallback(self.assertEqual, 6)
test_defgen.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_nonGeneratorReturn(self):
        """
        Ensure that C{TypeError} with a message about L{inlineCallbacks} is
        raised when a non-generator returns something other than a generator.
        """
        def _noYield():
            return 5
        _noYield = inlineCallbacks(_noYield)

        self.assertIn("inlineCallbacks",
            str(self.assertRaises(TypeError, _noYield)))
test_defgen.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_nonGeneratorReturnValue(self):
        """
        Ensure that C{TypeError} with a message about L{inlineCallbacks} is
        raised when a non-generator calls L{returnValue}.
        """
        def _noYield():
            returnValue(5)
        _noYield = inlineCallbacks(_noYield)

        self.assertIn("inlineCallbacks",
            str(self.assertRaises(TypeError, _noYield)))
test_deferred.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_passInlineCallbacks(self):
        """
        The body of a L{defer.inlineCallbacks} decorated test gets run.
        """
        result = self.runTest('test_passInlineCallbacks')
        self.assertTrue(result.wasSuccessful())
        self.assertEqual(result.testsRun, 1)
        self.assertTrue(detests.DeferredTests.touched)


问题


面经


文章

微信
公众号

扫码关注公众号