def test_granted(self):
c = coordinator.BaseCoordinator()
unit = hookenv.local_unit()
lock = 'mylock'
ts = coordinator._timestamp()
c.grants = {}
# Unit makes a request, but it isn't granted
c.requests = {unit: {lock: ts}}
self.assertFalse(c.granted(lock))
# Once the leader has granted the request, all good.
# It does this by mirroring the request timestamp.
c.grants = {unit: {lock: ts}}
self.assertTrue(c.granted(lock))
# The unit releases the lock by removing the request.
c.requests = {unit: {}}
self.assertFalse(c.granted(lock))
# If the unit makes a new request before the leader
# has had a chance to do its housekeeping, the timestamps
# do not match and the lock not considered granted.
ts = coordinator._timestamp()
c.requests = {unit: {lock: ts}}
self.assertFalse(c.granted(lock))
# Until the leader gets around to its duties.
c.grants = {unit: {lock: ts}}
self.assertTrue(c.granted(lock))
评论列表
文章目录