def test_multi_thread_blocking(self):
"""In a multithreaded environment, the requests should still be blocked
if exceding the quota."""
limiter = RateLimiter(3, 0.5)
def thread_target():
with limiter:
limiter.add_request()
threads = [threading.Thread(target=thread_target) for _ in range(4)]
start_time = time.time()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
stop_time = time.time()
self.assertGreaterEqual(stop_time - start_time, 0.5)
评论列表
文章目录