test_util.py 文件源码

python
阅读 25 收藏 0 点赞 0 评论 0

项目:py-ipv8 作者: qstokkink 项目源码 文件源码
def test_blocking_call(self):
        """
        Check if the reactor thread is properly blocked by a function marked as such.
        """
        @blocking_call_on_reactor_thread
        @inlineCallbacks
        def waiter():
            # 'Release' our claim on the reactor thread.
            # blocking_call_on_reactor_thread should prevent anything else being scheduled though.
            yield deferLater(reactor, 0.01, lambda: None)
            waiter.variable += 1
            returnValue(waiter.variable)

        @blocking_call_on_reactor_thread
        def quicker():
            # Immediately use the reactor thread and return
            waiter.variable += 1
            return succeed(waiter.variable)

        waiter.variable = 1

        # 'Release' the reactor thread and increment waiter.variable
        # If release didn't allow other to be scheduled, waiter.variable is now 2
        # If quicker() came first, waiter.variable is now 3 (bad)
        value = yield waiter()
        # Claim reactor thread and increment waiter.variable
        # If waiter() came first, waiter.variable is now 3
        # If quicker() managed to sneak in before this, waiter.variable is now 2 (bad)
        value2 = yield quicker()

        self.assertEqual(value, 2)
        self.assertEqual(value2, 3)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号