test_threadpool.py 文件源码

python
阅读 28 收藏 0 点赞 0 评论 0

项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码
def test_callbackThread(self):
        """
        L{ThreadPool.callInThreadWithCallback} calls the function it is
        given and the C{onResult} callback in the same thread.
        """
        threadIds = []

        event = threading.Event()

        def onResult(success, result):
            threadIds.append(threading.currentThread().ident)
            event.set()

        def func():
            threadIds.append(threading.currentThread().ident)

        tp = threadpool.ThreadPool(0, 1)
        tp.callInThreadWithCallback(onResult, func)
        tp.start()
        self.addCleanup(tp.stop)

        event.wait(self.getTimeout())
        self.assertEqual(len(threadIds), 2)
        self.assertEqual(threadIds[0], threadIds[1])
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号