def test_check_daemons(self):
"""
The daemons are checked to be running every so often. When N=5 of these
checks fail, the daemon will be restarted.
"""
clock = Clock()
dog = WatchDog(clock,
broker=AsynchronousPingDaemon("test-broker"),
monitor=AsynchronousPingDaemon("test-monitor"),
manager=AsynchronousPingDaemon("test-manager"))
dog.start_monitoring()
for i in range(4):
clock.advance(5)
dog.broker.fire_running(False)
dog.monitor.fire_running(True)
dog.manager.fire_running(True)
self.assertEqual(dog.broker.boots, [])
clock.advance(5)
dog.broker.fire_running(False)
dog.monitor.fire_running(True)
dog.manager.fire_running(True)
self.assertEqual(dog.broker.boots, [STOP, START])
评论列表
文章目录