def test_send_request(self):
""" Test the execution of a deferred Supervisor request. """
from supvisors.mainloop import SupvisorsMainLoop
from supvisors.utils import DeferredRequestHeaders
main_loop = SupvisorsMainLoop(self.supvisors)
# patch main loop subscriber
with patch.multiple(main_loop, check_address=DEFAULT,
start_process=DEFAULT, stop_process=DEFAULT,
restart=DEFAULT, shutdown=DEFAULT) as mocked_loop:
# test check address
self.check_call(main_loop, mocked_loop, 'check_address',
DeferredRequestHeaders.CHECK_ADDRESS,
('10.0.0.2', ))
# test start process
self.check_call(main_loop, mocked_loop, 'start_process',
DeferredRequestHeaders.START_PROCESS,
('10.0.0.2', 'dummy_process', 'extra args'))
# test stop process
self.check_call(main_loop, mocked_loop, 'stop_process',
DeferredRequestHeaders.STOP_PROCESS,
('10.0.0.2', 'dummy_process'))
# test restart
self.check_call(main_loop, mocked_loop, 'restart',
DeferredRequestHeaders.RESTART,
('10.0.0.2', ))
# test shutdown
self.check_call(main_loop, mocked_loop, 'shutdown',
DeferredRequestHeaders.SHUTDOWN,
('10.0.0.2', ))
评论列表
文章目录