locks_test.py 文件源码

python
阅读 18 收藏 0 点赞 0 评论 0

项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码
def test_garbage_collection(self):
        # Test that timed-out waiters are occasionally cleaned from the queue.
        sem = locks.Semaphore(value=0)
        futures = [sem.acquire(timedelta(seconds=0.01)) for _ in range(101)]

        future = sem.acquire()
        self.assertEqual(102, len(sem._waiters))

        # Let first 101 waiters time out, triggering a collection.
        yield gen.sleep(0.02)
        self.assertEqual(1, len(sem._waiters))

        # Final waiter is still active.
        self.assertFalse(future.done())
        sem.release()
        self.assertTrue(future.done())

        # Prevent "Future exception was never retrieved" messages.
        for future in futures:
            self.assertRaises(TimeoutError, future.result)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号