def test_acquire_leader(self):
# When acquire() is called by the leader, it needs
# to make a grant decision immediately. It can't defer
# making the decision until a future hook, as no future
# hooks will be triggered.
hookenv.is_leader.return_value = True
c = coordinator.Serial() # Not Base. Test hooks into default_grant.
lock = 'mylock'
unit = hookenv.local_unit()
c.grants = {}
c.requests = {unit: {}}
with patch.object(c, 'default_grant') as default_grant:
default_grant.side_effect = iter([False, True])
self.assertFalse(c.acquire(lock))
ts = c.request_timestamp(lock)
self.assertTrue(c.acquire(lock))
self.assertEqual(ts, c.request_timestamp(lock))
# If it it granted, the leader doesn't make a decision again.
self.assertTrue(c.acquire(lock))
self.assertEqual(ts, c.request_timestamp(lock))
self.assertEqual(default_grant.call_count, 2)
评论列表
文章目录