test_threads.py 文件源码

python
阅读 35 收藏 0 点赞 0 评论 0

项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码
def test_callFromThread(self):
        """
        Test callFromThread functionality: from the main thread, and from
        another thread.
        """
        def cb(ign):
            firedByReactorThread = defer.Deferred()
            firedByOtherThread = defer.Deferred()

            def threadedFunc():
                reactor.callFromThread(firedByOtherThread.callback, None)

            reactor.callInThread(threadedFunc)
            reactor.callFromThread(firedByReactorThread.callback, None)

            return defer.DeferredList(
                [firedByReactorThread, firedByOtherThread],
                fireOnOneErrback=True)
        return self._waitForThread().addCallback(cb)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号