def _subscribe_to_topic(self, alias: str, topic: Union[bytes, str]):
'''
Do the actual ZeroMQ subscription of a socket given by its alias to
a specific topic. This method only makes sense to be called on
SUB/SYNC_SUB sockets.
Note that the handler is not set within this function.
'''
topic = topic_to_bytes(topic)
if isinstance(self.address[alias], AgentAddress):
self.socket[alias].setsockopt(zmq.SUBSCRIBE, topic)
elif isinstance(self.address[alias], AgentChannel):
channel = self.address[alias]
sub_address = channel.receiver
treated_topic = channel.uuid + topic
self.socket[sub_address].setsockopt(zmq.SUBSCRIBE, treated_topic)
else:
raise NotImplementedError('Unsupported address type %s!' %
self.address[alias])
评论列表
文章目录