test_client.py 文件源码

python
阅读 32 收藏 0 点赞 0 评论 0

项目:krpcScripts 作者: jwvanderbeck 项目源码 文件源码
def test_thread_safe(self):
        thread_count = 4
        repeats = 1000

        latch = [threading.Condition(), thread_count]

        def thread_main(latch):
            for _ in range(repeats):
                self.assertEqual("False", self.conn.test_service.bool_to_string(False))
                self.assertEqual(12345, self.conn.test_service.string_to_int32("12345"))
            with latch[0]:
                latch[1] -= 1
                if latch[1] <= 0:
                    latch[0].notifyAll()

        for i in range(thread_count):
            t = threading.Thread(target=thread_main, args=(latch,))
            t.daemon = True
            t.start()

        with latch[0]:
            while latch[1] > 0:
                latch[0].wait(10)
        self.assertEqual(0, latch[1]);
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号