def test_waitUntilLockedWithTimeoutUnlocked(self):
"""
Test that a lock can be acquired while a lock is held
but the lock is unlocked before our timeout.
"""
def onTimeout(f):
f.trap(defer.TimeoutError)
self.fail("Should not have timed out")
self.assertTrue(self.lock.lock())
self.clock.callLater(1, self.lock.unlock)
d = self.lock.deferUntilLocked(timeout=10)
d.addErrback(onTimeout)
self.clock.pump([1] * 10)
return d
评论列表
文章目录