def setUp(self):
""" Create a dummy supvisors and a ZMQ context. """
from supvisors.supvisorszmq import EventPublisher, EventSubscriber
# the dummy Supvisors is used for addresses and ports
self.supvisors = MockedSupvisors()
# create the ZeroMQ context
# create publisher and subscriber
self.publisher = EventPublisher(
self.supvisors.options.event_port,
self.supvisors.logger)
self.subscriber = EventSubscriber(
zmq.Context.instance(),
self.supvisors.options.event_port,
self.supvisors.logger)
# WARN: this subscriber does not include a subscription
# when using a subscription, use a time sleep to give time
# to PyZMQ to handle it
# WARN: socket configuration is meant to be blocking
# however, a failure would block the unit test,
# so a timeout is set for reception
self.subscriber.socket.setsockopt(zmq.RCVTIMEO, 1000)
# create test payloads
self.supvisors_payload = Payload({'state': 'running',
'version': '1.0'})
self.address_payload = Payload({'state': 'silent',
'name': 'cliche01',
'date': 1234})
self.application_payload = Payload({'state': 'starting',
'name': 'supvisors'})
self.process_payload = Payload({'state': 'running',
'process_name': 'plugin',
'application_name': 'supvisors',
'date': 1230})
self.event_payload = Payload({'state': 20,
'name': 'plugin',
'group': 'supvisors',
'now': 1230})
评论列表
文章目录