def testBlocking(self):
obj1 = ExampleClass()
obj2 = ExampleClass()
# Dont allow to call again until its running and wait until its running
threads = [
gevent.spawn(obj1.countBlocking),
gevent.spawn(obj1.countBlocking),
gevent.spawn(obj1.countBlocking),
gevent.spawn(obj2.countBlocking)
]
assert obj2.countBlocking() == "counted:5" # The call is ignored as obj2.countBlocking already counting, but block until its finishes
gevent.joinall(threads)
assert [thread.value for thread in threads] == ["counted:5","counted:5","counted:5","counted:5"] # Check the return value for every call
obj2.countBlocking() # Allow to call again as obj2.countBlocking finished
assert obj1.counted == 5
assert obj2.counted == 10
评论列表
文章目录