def test_multi_thread_non_blocking(self):
"""If the rate limit is not exceded, check if the requests are not
blocked."""
limiter = RateLimiter(4, 1)
def thread_target():
with limiter:
limiter.add_request()
threads = [threading.Thread(target=thread_target) for _ in range(4)]
start_time = time.time()
for thread in threads:
thread.start()
for thread in threads:
thread.join()
stop_time = time.time()
self.assertLess(stop_time - start_time, 1)
评论列表
文章目录