def test_init_socket(self):
pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
logger = self.logger
handler = handlers.PUBHandler(pub)
handler.setLevel(logging.DEBUG)
handler.root_topic = self.topic
logger.addHandler(handler)
self.assertTrue(handler.socket is pub)
self.assertTrue(handler.ctx is pub.context)
self.assertTrue(handler.ctx is self.context)
sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
import time; time.sleep(0.1)
msg1 = 'message'
logger.info(msg1)
(topic, msg2) = sub.recv_multipart()
self.assertEqual(topic, b'zmq.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
python类SUBSCRIBE的实例源码
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def test_unicode_sockopts(self):
"""test setting/getting sockopts with unicode strings"""
topic = "tést"
if str is not unicode:
topic = topic.decode('utf8')
p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
self.assertEqual(s.send_unicode, s.send_unicode)
self.assertEqual(p.recv_unicode, p.recv_unicode)
self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)
identb = s.getsockopt(zmq.IDENTITY)
identu = identb.decode('utf16')
identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
self.assertEqual(identu, identu2)
time.sleep(0.1) # wait for connection/subscription
p.send_unicode(topic,zmq.SNDMORE)
p.send_unicode(topic*2, encoding='latin-1')
self.assertEqual(topic, s.recv_unicode())
self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
def test_init_iface(self):
logger = self.logger
ctx = self.context
handler = handlers.PUBHandler(self.iface)
self.assertFalse(handler.ctx is ctx)
self.sockets.append(handler.socket)
# handler.ctx.term()
handler = handlers.PUBHandler(self.iface, self.context)
self.sockets.append(handler.socket)
self.assertTrue(handler.ctx is ctx)
handler.setLevel(logging.DEBUG)
handler.root_topic = self.topic
logger.addHandler(handler)
sub = ctx.socket(zmq.SUB)
self.sockets.append(sub)
sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
sub.connect(self.iface)
import time; time.sleep(0.25)
msg1 = 'message'
logger.info(msg1)
(topic, msg2) = sub.recv_multipart()
self.assertEqual(topic, b'zmq.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def test_init_socket(self):
pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
logger = self.logger
handler = handlers.PUBHandler(pub)
handler.setLevel(logging.DEBUG)
handler.root_topic = self.topic
logger.addHandler(handler)
self.assertTrue(handler.socket is pub)
self.assertTrue(handler.ctx is pub.context)
self.assertTrue(handler.ctx is self.context)
sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
import time; time.sleep(0.1)
msg1 = 'message'
logger.info(msg1)
(topic, msg2) = sub.recv_multipart()
self.assertEqual(topic, b'zmq.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def test_unicode_sockopts(self):
"""test setting/getting sockopts with unicode strings"""
topic = "tést"
if str is not unicode:
topic = topic.decode('utf8')
p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
self.assertEqual(s.send_unicode, s.send_unicode)
self.assertEqual(p.recv_unicode, p.recv_unicode)
self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)
identb = s.getsockopt(zmq.IDENTITY)
identu = identb.decode('utf16')
identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
self.assertEqual(identu, identu2)
time.sleep(0.1) # wait for connection/subscription
p.send_unicode(topic,zmq.SNDMORE)
p.send_unicode(topic*2, encoding='latin-1')
self.assertEqual(topic, s.recv_unicode())
self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
def test_init_iface(self):
logger = self.logger
ctx = self.context
handler = handlers.PUBHandler(self.iface)
self.assertFalse(handler.ctx is ctx)
self.sockets.append(handler.socket)
# handler.ctx.term()
handler = handlers.PUBHandler(self.iface, self.context)
self.sockets.append(handler.socket)
self.assertTrue(handler.ctx is ctx)
handler.setLevel(logging.DEBUG)
handler.root_topic = self.topic
logger.addHandler(handler)
sub = ctx.socket(zmq.SUB)
self.sockets.append(sub)
sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
sub.connect(self.iface)
import time; time.sleep(0.25)
msg1 = 'message'
logger.info(msg1)
(topic, msg2) = sub.recv_multipart()
self.assertEqual(topic, b'zmq.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def test_init_socket(self):
pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
logger = self.logger
handler = handlers.PUBHandler(pub)
handler.setLevel(logging.DEBUG)
handler.root_topic = self.topic
logger.addHandler(handler)
self.assertTrue(handler.socket is pub)
self.assertTrue(handler.ctx is pub.context)
self.assertTrue(handler.ctx is self.context)
sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
import time; time.sleep(0.1)
msg1 = 'message'
logger.info(msg1)
(topic, msg2) = sub.recv_multipart()
self.assertEqual(topic, b'zmq.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def test_root_topic(self):
logger, handler, sub = self.connect_handler()
handler.socket.bind(self.iface)
sub2 = sub.context.socket(zmq.SUB)
self.sockets.append(sub2)
sub2.connect(self.iface)
sub2.setsockopt(zmq.SUBSCRIBE, b'')
handler.root_topic = b'twoonly'
msg1 = 'ignored'
logger.info(msg1)
self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
topic,msg2 = sub2.recv_multipart()
self.assertEqual(topic, b'twoonly.INFO')
self.assertEqual(msg2, b(msg1)+b'\n')
logger.removeHandler(handler)
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
in_prefix, out_prefix)
alice = self.context.socket(zmq.PAIR)
bob = self.context.socket(zmq.PAIR)
mon = self.context.socket(zmq.SUB)
aport = alice.bind_to_random_port('tcp://127.0.0.1')
bport = bob.bind_to_random_port('tcp://127.0.0.1')
mport = mon.bind_to_random_port('tcp://127.0.0.1')
mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
self.device.connect_in("tcp://127.0.0.1:%i"%aport)
self.device.connect_out("tcp://127.0.0.1:%i"%bport)
self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
self.device.start()
time.sleep(.2)
try:
# this is currenlty necessary to ensure no dropped monitor messages
# see LIBZMQ-248 for more info
mon.recv_multipart(zmq.NOBLOCK)
except zmq.ZMQError:
pass
self.sockets.extend([alice, bob, mon])
return alice, bob, mon
def sub_task(self, name, ip_port_sub):
if (ip_port_sub == "0"):
return
ctx = zmq.Context()
# subscribe socket
socket_sub = ctx.socket(zmq.SUB)
socket_sub.connect("tcp://%s" % ip_port_sub)
socket_sub.setsockopt(zmq.SUBSCRIBE, '')
total_value = 0
self.sub_msg_cnt = 0
while not self.shutdown:
string = socket_sub.recv()
topic, messageData = string.split()
total_value += int(messageData)
self.sub_msg_cnt += 1
print("SUB:: [%d] %s %s" % (self.sub_msg_cnt, topic, messageData))
def execute_command_forwarder():
from oldspeak.console.parsers.streamer import parser
args = parser.parse_args(get_sub_parser_argv())
bootstrap_conf_with_gevent(args)
device = Device(zmq.FORWARDER, zmq.SUB, zmq.PUB)
device.bind_in(args.subscriber)
device.bind_out(args.publisher)
device.setsockopt_in(zmq.SUBSCRIBE, b'')
if args.subscriber_hwm:
device.setsockopt_in(zmq.RCVHWM, args.subscriber_hwm)
if args.publisher_hwm:
device.setsockopt_out(zmq.SNDHWM, args.publisher_hwm)
print "oldspeak forwarder started"
print "date", datetime.utcnow().isoformat()
print "subscriber", (getattr(args, 'subscriber'))
print "publisher", (getattr(args, 'publisher'))
device.start()
def _data_listener(self):
if len(sys.argv) > 1:
for l in open(sys.argv[1]).readlines():
QtCore.QMetaObject.invokeMethod(
self, "_on_server_message",
QtCore.Qt.QueuedConnection,
QtCore.Q_ARG(dict, json.loads(l)))
port = 9876
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect ("tcp://localhost:%d" % port)
socket.setsockopt(zmq.SUBSCRIBE, '')
while True:
msg = socket.recv_json()
try:
QtCore.QMetaObject.invokeMethod(
self, "_on_server_message",
QtCore.Qt.QueuedConnection,
QtCore.Q_ARG(dict, msg))
except AttributeError:
pass
def main(pub_port=None, sub_port=None):
'''main of forwarder
:param sub_port: port for subscribers
:param pub_port: port for publishers
'''
try:
if sub_port is None:
sub_port = get_sub_port()
if pub_port is None:
pub_port = get_pub_port()
context = zmq.Context(1)
frontend = context.socket(zmq.SUB)
backend = context.socket(zmq.PUB)
frontend.bind('tcp://*:{pub_port}'.format(pub_port=pub_port))
frontend.setsockopt(zmq.SUBSCRIBE, b'')
backend.bind('tcp://*:{sub_port}'.format(sub_port=sub_port))
zmq.device(zmq.FORWARDER, frontend, backend)
except KeyboardInterrupt:
pass
finally:
frontend.close()
backend.close()
context.term()
def run(white_point):
config = discover()
downstream_url = config['downstream']
socket = context.socket(zmq.SUB)
socket.connect(config['downstream'])
log.info("Connecting to %s" % downstream_url)
socket.setsockopt_string(zmq.SUBSCRIBE, DEFAULT_CHANNEL)
stream = ZMQStream(socket)
loop = ioloop.IOLoop.instance()
with blink1(white_point=white_point) as b1:
reciever = Receiver(b1, loop)
stream.on_recv(reciever.recieve)
loop.add_callback(reciever.throbber)
loop.start()
def startup_local_client():
'''
Startup a local ZMQ client to receive the published messages.
'''
time.sleep(2)
global TEST_CLIENT
context = zmq.Context()
TEST_CLIENT = context.socket(zmq.SUB)
TEST_CLIENT.connect('tcp://{addr}:{port}'.format(
addr=NAPALM_LOGS_TEST_PUB_ADDR,
port=NAPALM_LOGS_TEST_PUB_PORT)
)
TEST_CLIENT.setsockopt(zmq.SUBSCRIBE, b'')
# Startup the local ZMQ client.
def set_topic(self, name, topic):
"""shortcut to :py:meth:SocketManager.set_socket_option(zmq.TOPIC, topic)
:param name: the name of the socket where data will pad through
:param topic: the option from the ``zmq`` module
**Example:**
::
>>> import zmq
>>> from agentzero.core import SocketManager
>>>
>>> sockets = SocketManager()
>>> sockets.ensure_and_bind('events', zmq.SUB, 'tcp://*:6000', zmq.POLLIN)
>>>
>>> # subscribe only to topics beginning with "logs"
>>> sockets.set_topic('events', 'logs')
>>> event = sockets.recv_event_safe('events')
>>> event.topic, event.data
'logs:2016-06-20', {'stdout': 'hello world'}
"""
safe_topic = bytes(topic)
self.set_socket_option(name, self.zmq.SUBSCRIBE, safe_topic)
def feedback_loop(self, *args):
# feedback socket
ctx = zmq.Context()
socket = ctx.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, "")
socket.connect(config.get("broadcaster-feedback-url", "tcp://localhost:9110"))
print "brc feedback channel connected"
while True:
msg = [socket.recv()]
while socket.getsockopt(zmq.RCVMORE):
msg.append(socket.recv())
print "feedback msg"
if len(msg) == 3:
self.on_feedback_msg(*msg)
else:
print "bad feedback message", len(msg)
def status_loop(self, *args):
# feedback socket
print "connect brc feedback"
ctx = zmq.Context()
socket = ctx.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, "")
socket.connect(config.get("broadcaster-feedback-url", "tcp://localhost:9112"))
print "brc status channel connected"
while True:
msg = socket.recv()
nodes = 0
try:
nodes = struct.unpack("<Q", msg)[0]
self.last_status = time.time()
except:
print "bad nodes data", msg
if not nodes == self.last_nodes:
print "brc hosts", nodes
self.last_nodes = nodes