TestNoparallel.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:zeronet-debian 作者: bashrc 项目源码 文件源码
def testBlocking(self):
        obj1 = ExampleClass()
        obj2 = ExampleClass()

        # Dont allow to call again until its running and wait until its running
        threads = [
            gevent.spawn(obj1.countBlocking),
            gevent.spawn(obj1.countBlocking),
            gevent.spawn(obj1.countBlocking),
            gevent.spawn(obj2.countBlocking)
        ]
        assert obj2.countBlocking() == "counted:5"  # The call is ignored as obj2.countBlocking already counting, but block until its finishes
        gevent.joinall(threads)
        assert [thread.value for thread in threads] == ["counted:5","counted:5","counted:5","counted:5"]  # Check the return value for every call
        obj2.countBlocking()  # Allow to call again as obj2.countBlocking finished

        assert obj1.counted == 5
        assert obj2.counted == 10
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号