def sockets(self):
if not self._sockets:
try:
address = self.config[self.__class__.__name__]['listen']
get_logger().debug(f"Binding {self.__class__.__name__} socket <{address}>")
self._sockets['listen'] = zmq.Context().socket(zmq.SUB)
self._sockets['listen'].setsockopt_string(zmq.SUBSCRIBE, '')
self._sockets['listen'].bind(address)
except KeyError: # some modules may not need to listen to
pass
except zmq.error.ZMQError as e:
if "Address in use" in str(e):
get_logger().error(f"Binding failed because the address {address} in use")
get_logger().error(f"Terminate the previous process using the address above")
self.loop.stop()
return
try:
next_modules = self.config[self.__class__.__name__]['next']
if isinstance(next_modules, str):
next_modules = [next_modules,]
for key in next_modules:
address = self.config[key]['listen']
get_logger().debug(f"Connecting to {key} socket <{address}>")
self._sockets[key] = zmq.Context().socket(zmq.PUB)
self._sockets[key].connect(address)
time.sleep(0.001) # ensuring connection made correctly
except KeyError: # some modules may not need to send messages
pass
return self._sockets
评论列表
文章目录