def unsubscribe(self, alias: str, topic: Union[bytes, str]) -> None:
'''
Unsubscribe a SUB/SYNC_SUB socket given by its alias from a given
specific topic, and delete its entry from the handlers dictionary.
If instead of a single topic, a tuple or a list of topics is passed,
the agent will unsubscribe from all the supplied topics.
'''
if isinstance(topic, (tuple, list)):
for t in topic:
self.unsubscribe(alias, t)
return
topic = topic_to_bytes(topic)
if isinstance(self.address[alias], AgentAddress):
self.socket[alias].setsockopt(zmq.UNSUBSCRIBE, topic)
del self.handler[self.socket[alias]][topic]
elif isinstance(self.address[alias], AgentChannel):
channel = self.address[alias]
sub_address = channel.receiver
treated_topic = channel.twin_uuid + topic
self.socket[sub_address].setsockopt(zmq.UNSUBSCRIBE, treated_topic)
del self.handler[self.socket[sub_address]][treated_topic]
else:
raise NotImplementedError('Unsupported address type %s!' %
self.address[alias])
评论列表
文章目录