test_threads.py 文件源码

python
阅读 31 收藏 0 点赞 0 评论 0

项目:hostapd-mana 作者: adde88 项目源码 文件源码
def testWakerOverflow(self):
        self.failure = None
        waiter = threading.Event()
        def threadedFunction():
            # Hopefully a hundred thousand queued calls is enough to
            # trigger the error condition
            for i in xrange(100000):
                try:
                    reactor.callFromThread(lambda: None)
                except:
                    self.failure = failure.Failure()
                    break
            waiter.set()
        reactor.callInThread(threadedFunction)
        waiter.wait(120)
        if not waiter.isSet():
            self.fail("Timed out waiting for event")
        if self.failure is not None:
            return defer.fail(self.failure)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号