def test_timeout(self):
"""Test that when a session is established, then left idle
for the timeout period, the http_agent service emits
a termination message on the RX channel."""
session_id = self._open_session()
# No alert to begin with
alerts = HostContactAlert.filter_by_item(self.host)
self.assertEqual(alerts.count(), 0)
time.sleep(HostState.CONTACT_TIMEOUT + HostStatePoller.POLL_INTERVAL + RABBITMQ_GRACE_PERIOD)
# Should be one SESSION_TERMINATE message to AMQP with a matching session ID
message = self._receive_one_amqp()
self.assertDictEqual(message, {
'fqdn': self.CLIENT_NAME,
'type': 'SESSION_TERMINATE',
'plugin': self.PLUGIN,
'session_seq': None,
'session_id': session_id,
'body': None
})
with transaction.commit_manually():
transaction.commit()
alerts = HostContactAlert.filter_by_item(self.host)
self.assertEqual(alerts.count(), 1)
# Should be a message waiting for the agent telling it that its session was terminated
# (timing out doesn't mean the agent is gone, it could just be experiencing network difficulties)
# What's more, the agent doesn't necessarily *know* that it had network difficulties, e.g. if it
# just got real slow and waited too long between GETs.
# This has to cut both ways to be reliable:
# * We have to tell the agent that we thought it went away, by sending a TERMINATE for sessions
# * If the agent finds that a GET fails then it has to assume that we might have put session
# messages in that GET, and terminate all its sessions in case one of those GET messages
# was really a TERMINATE
response = self._get()
self.assertResponseOk(response)
forwarded_messages = response.json()['messages']
self.assertEqual(len(forwarded_messages), 1)
self.assertDictEqual(forwarded_messages[0], {
'fqdn': self.CLIENT_NAME,
'type': 'SESSION_TERMINATE',
'plugin': self.PLUGIN,
'session_seq': None,
'session_id': None,
'body': None
})
test_http_agent.py 文件源码
python
阅读 26
收藏 0
点赞 0
评论 0
评论列表
文章目录