def setUp(self):
""" Create a dummy supvisors, ZMQ context and sockets. """
from supvisors.supvisorszmq import (InternalEventPublisher,
InternalEventSubscriber)
# the dummy Supvisors is used for addresses and ports
self.supvisors = MockedSupvisors()
# create publisher and subscriber
self.publisher = InternalEventPublisher(
self.supvisors.address_mapper.local_address,
self.supvisors.options.internal_port,
self.supvisors.logger)
self.subscriber = InternalEventSubscriber(
self.supvisors.address_mapper.addresses,
self.supvisors.options.internal_port)
# socket configuration is meant to be blocking
# however, a failure would block the unit test,
# so a timeout is set for reception
self.subscriber.socket.setsockopt(zmq.RCVTIMEO, 1000)
# publisher does not wait for subscriber clients to work,
# so give some time for connections
time.sleep(1)
评论列表
文章目录