def test_disconnection(self):
""" Test the disconnection of subscribers. """
from supvisors.utils import InternalEventHeaders
# get the local address
local_address = self.supvisors.address_mapper.local_address
# test remote disconnection
address = next(address
for address in self.supvisors.address_mapper.addresses
if address != local_address)
self.subscriber.disconnect([address])
# send a tick event from the local publisher
payload = {'date': 1000}
self.publisher.send_tick_event(payload)
# check the reception of the tick event
msg = self.receive('Tick')
self.assertTupleEqual((InternalEventHeaders.TICK,
local_address, payload), msg)
# test local disconnection
self.subscriber.disconnect([local_address])
# send a tick event from the local publisher
self.publisher.send_tick_event(payload)
# check the non-reception of the tick event
with self.assertRaises(zmq.Again):
self.subscriber.receive()
评论列表
文章目录