def listen(self, namespace, max_timeout):
"""Register to listen to a namespace and yield messages as they arrive.
If no messages arrive within `max_timeout` seconds, this will yield a
`None` to allow clients to do periodic actions like send PINGs.
This will run forever and yield items as an iterable. Use it in a loop
and break out of it when you want to deregister.
"""
queue = gevent.queue.Queue()
namespace = namespace.rstrip("/")
for ns in _walk_namespace_hierarchy(namespace):
self.consumers.setdefault(ns, []).append(queue)
try:
while True:
# jitter the timeout a bit to ensure we don't herd
timeout = max_timeout - random.uniform(0, max_timeout / 2)
try:
yield queue.get(block=True, timeout=timeout)
except gevent.queue.Empty:
yield None
# ensure we're not starving others by spinning
gevent.sleep()
finally:
for ns in _walk_namespace_hierarchy(namespace):
self.consumers[ns].remove(queue)
if not self.consumers[ns]:
del self.consumers[ns]
评论列表
文章目录