def unsubscribe(self, alias: str, topic: Union[bytes, str]) -> None:
'''
Unsubscribe a SUB/SYNC_SUB socket given by its alias from a given
specific topic, and delete its entry from the handlers dictionary.
If instead of a single topic, a tuple or a list of topics is passed,
the agent will unsubscribe from all the supplied topics.
'''
if isinstance(topic, (tuple, list)):
for t in topic:
self.unsubscribe(alias, t)
return
topic = topic_to_bytes(topic)
if isinstance(self.address[alias], AgentAddress):
self.socket[alias].setsockopt(zmq.UNSUBSCRIBE, topic)
del self.handler[self.socket[alias]][topic]
elif isinstance(self.address[alias], AgentChannel):
channel = self.address[alias]
sub_address = channel.receiver
treated_topic = channel.twin_uuid + topic
self.socket[sub_address].setsockopt(zmq.UNSUBSCRIBE, treated_topic)
del self.handler[self.socket[sub_address]][treated_topic]
else:
raise NotImplementedError('Unsupported address type %s!' %
self.address[alias])
python类UNSUBSCRIBE的实例源码
def unsubscribe_all(self):
""" Subscription to all events. """
self.socket.setsockopt(zmq.UNSUBSCRIBE, '')
def unsubscribe(self, code):
""" Remove subscription to the event named code. """
self.socket.setsockopt(zmq.UNSUBSCRIBE, code.encode('utf-8'))
# reception part
def unsubscribe(cls, username):
cls._zmq_stream.socket.setsockopt(zmq.UNSUBSCRIBE,
username.encode('utf-8'))
def set_socket_option(self, name, option, value):
"""calls ``zmq.setsockopt`` on the given socket.
:param name: the name of the socket where data will pad through
:param option: the option from the ``zmq`` module
:param value:
Here are some examples of options:
* ``zmq.HWM``: Set high water mark
* ``zmq.AFFINITY``: Set I/O thread affinity
* ``zmq.IDENTITY``: Set socket identity
* ``zmq.SUBSCRIBE``: Establish message filter
* ``zmq.UNSUBSCRIBE``: Remove message filter
* ``zmq.SNDBUF``: Set kernel transmit buffer size
* ``zmq.RCVBUF``: Set kernel receive buffer size
* ``zmq.LINGER``: Set linger period for socket shutdown
* ``zmq.BACKLOG``: Set maximum length of the queue of outstanding connections
* for the full list go to ``http://api.zeromq.org/4-0:zmq-setsockopt``
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.create('pipe-in', zmq.PULL)
>>>
>>> # block after 10 messages are queued
>>> sockets.set_socket_option('pipe-in', zmq.HWM, 10)
"""
socket = self.get_by_name(name)
socket.setsockopt(option, value)
def _connect(self):
# Subscribe to the hello topic - once we recieve a hello we'll send a request
# for real data. The callback plugin will effectively block execution until we
# Send this request
self.socket.setsockopt(zmq.SUBSCRIBE, 'hello')
# Define the control socket for responding to the 'hello' topic
control_socket = self.context.socket(zmq.REQ)
control_socket.connect(self._env['DAUBER_CONTROL_SOCKET_URI'])
timeout = 500
t_last = time.time()
while (time.time() - t_last) < timeout:
ready = dict(self.poller.poll())
if ready.get(self.socket):
topic, _ = self.socket.recv_multipart()
if topic == 'hello':
# Signal that we've connected and we're ready to recieve data
control_socket.send(b'')
control_socket.recv()
break
assert (time.time() - t_last) < timeout, \
"Timed out before recieving a hello topic message from the publisher."
del control_socket
self.socket.setsockopt(zmq.UNSUBSCRIBE, 'hello')
def test_change_subscription(self):
# FIXME: Extensive testing showed this particular test is the root cause
# of sporadic failures on Travis.
pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, b'test')
eventlet.sleep(0)
sub_ready = eventlet.Event()
sub_last = eventlet.Event()
sub_done = eventlet.Event()
def rx():
while sub.recv() != b'test BEGIN':
eventlet.sleep(0)
sub_ready.send()
count = 0
while True:
msg = sub.recv()
if msg == b'test BEGIN':
# BEGIN may come many times
continue
if msg == b'test LAST':
sub.setsockopt(zmq.SUBSCRIBE, b'done')
sub.setsockopt(zmq.UNSUBSCRIBE, b'test')
eventlet.sleep(0)
# In real application you should either sync
# or tolerate loss of messages.
sub_last.send()
if msg == b'done DONE':
break
count += 1
sub_done.send(count)
def tx():
# Sync receiver ready to avoid loss of first packets
while not sub_ready.ready():
pub.send(b'test BEGIN')
eventlet.sleep(0.005)
for i in range(1, 101):
msg = 'test {0}'.format(i).encode()
if i != 50:
pub.send(msg)
else:
pub.send(b'test LAST')
sub_last.wait()
# XXX: putting a real delay of 1ms here fixes sporadic failures on Travis
# just yield eventlet.sleep(0) doesn't cut it
eventlet.sleep(0.001)
pub.send(b'done DONE')
eventlet.spawn(rx)
eventlet.spawn(tx)
rx_count = sub_done.wait()
self.assertEqual(rx_count, 50)