python类SUBSCRIBE的实例源码

test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_init_iface(self):
        logger = self.logger
        ctx = self.context
        handler = handlers.PUBHandler(self.iface)
        self.assertFalse(handler.ctx is ctx)
        self.sockets.append(handler.socket)
        # handler.ctx.term()
        handler = handlers.PUBHandler(self.iface, self.context)
        self.sockets.append(handler.socket)
        self.assertTrue(handler.ctx is ctx)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)
        sub = ctx.socket(zmq.SUB)
        self.sockets.append(sub)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        sub.connect(self.iface)
        import time; time.sleep(0.25)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_root_topic(self):
        logger, handler, sub = self.connect_handler()
        handler.socket.bind(self.iface)
        sub2 = sub.context.socket(zmq.SUB)
        self.sockets.append(sub2)
        sub2.connect(self.iface)
        sub2.setsockopt(zmq.SUBSCRIBE, b'')
        handler.root_topic = b'twoonly'
        msg1 = 'ignored'
        logger.info(msg1)
        self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
        topic,msg2 = sub2.recv_multipart()
        self.assertEqual(topic, b'twoonly.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')

        logger.removeHandler(handler)
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_init_iface(self):
        logger = self.logger
        ctx = self.context
        handler = handlers.PUBHandler(self.iface)
        self.assertFalse(handler.ctx is ctx)
        self.sockets.append(handler.socket)
        # handler.ctx.term()
        handler = handlers.PUBHandler(self.iface, self.context)
        self.sockets.append(handler.socket)
        self.assertTrue(handler.ctx is ctx)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)
        sub = ctx.socket(zmq.SUB)
        self.sockets.append(sub)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        sub.connect(self.iface)
        import time; time.sleep(0.25)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_root_topic(self):
        logger, handler, sub = self.connect_handler()
        handler.socket.bind(self.iface)
        sub2 = sub.context.socket(zmq.SUB)
        self.sockets.append(sub2)
        sub2.connect(self.iface)
        sub2.setsockopt(zmq.SUBSCRIBE, b'')
        handler.root_topic = b'twoonly'
        msg1 = 'ignored'
        logger.info(msg1)
        self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
        topic,msg2 = sub2.recv_multipart()
        self.assertEqual(topic, b'twoonly.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')

        logger.removeHandler(handler)
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
agents.py 文件源码 项目:hydra 作者: lake-lerna 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def sub_task(self, name, ip_port_sub):
        if (ip_port_sub == "0"):
            return
        ctx = zmq.Context()
        # subscribe socket
        socket_sub = ctx.socket(zmq.SUB)
        socket_sub.connect("tcp://%s" % ip_port_sub)
        socket_sub.setsockopt(zmq.SUBSCRIBE, '')
        total_value = 0
        self.sub_msg_cnt = 0
        while not self.shutdown:
            string = socket_sub.recv()
            topic, messageData = string.split()
            total_value += int(messageData)
            self.sub_msg_cnt += 1
            print("SUB:: [%d] %s %s" % (self.sub_msg_cnt, topic, messageData))
servers.py 文件源码 项目:OldSpeak 作者: 0rbitAeolian 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def execute_command_forwarder():
    from oldspeak.console.parsers.streamer import parser

    args = parser.parse_args(get_sub_parser_argv())
    bootstrap_conf_with_gevent(args)

    device = Device(zmq.FORWARDER, zmq.SUB, zmq.PUB)

    device.bind_in(args.subscriber)
    device.bind_out(args.publisher)
    device.setsockopt_in(zmq.SUBSCRIBE, b'')
    if args.subscriber_hwm:
        device.setsockopt_in(zmq.RCVHWM, args.subscriber_hwm)

    if args.publisher_hwm:
        device.setsockopt_out(zmq.SNDHWM, args.publisher_hwm)

    print "oldspeak forwarder started"
    print "date", datetime.utcnow().isoformat()
    print "subscriber", (getattr(args, 'subscriber'))
    print "publisher", (getattr(args, 'publisher'))
    device.start()
mw_graphs_qt.py 文件源码 项目:pymindwave 作者: frans-fuerst 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _data_listener(self):
        if len(sys.argv) > 1:
            for l in open(sys.argv[1]).readlines():
                QtCore.QMetaObject.invokeMethod(
                    self, "_on_server_message",
                    QtCore.Qt.QueuedConnection,
                    QtCore.Q_ARG(dict, json.loads(l)))

        port = 9876
        context = zmq.Context()
        socket = context.socket(zmq.SUB)

        socket.connect ("tcp://localhost:%d" % port)
        socket.setsockopt(zmq.SUBSCRIBE, '')
        while True:
            msg = socket.recv_json()
            try:
                QtCore.QMetaObject.invokeMethod(
                    self, "_on_server_message",
                    QtCore.Qt.QueuedConnection,
                    QtCore.Q_ARG(dict, msg))
            except AttributeError:
                pass
forwarder.py 文件源码 项目:jps 作者: OTL 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def main(pub_port=None, sub_port=None):
    '''main of forwarder

    :param sub_port: port for subscribers
    :param pub_port: port for publishers
    '''
    try:
        if sub_port is None:
            sub_port = get_sub_port()
        if pub_port is None:
            pub_port = get_pub_port()
        context = zmq.Context(1)
        frontend = context.socket(zmq.SUB)
        backend = context.socket(zmq.PUB)

        frontend.bind('tcp://*:{pub_port}'.format(pub_port=pub_port))
        frontend.setsockopt(zmq.SUBSCRIBE, b'')
        backend.bind('tcp://*:{sub_port}'.format(sub_port=sub_port))
        zmq.device(zmq.FORWARDER, frontend, backend)
    except KeyboardInterrupt:
        pass
    finally:
        frontend.close()
        backend.close()
        context.term()
subscriber.py 文件源码 项目:blink1-python-old 作者: todbot 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(white_point):
    config = discover()
    downstream_url = config['downstream']

    socket = context.socket(zmq.SUB)
    socket.connect(config['downstream'])
    log.info("Connecting to %s" % downstream_url)

    socket.setsockopt_string(zmq.SUBSCRIBE, DEFAULT_CHANNEL)
    stream = ZMQStream(socket)

    loop = ioloop.IOLoop.instance()

    with blink1(white_point=white_point) as b1:
        reciever = Receiver(b1, loop)
        stream.on_recv(reciever.recieve)

        loop.add_callback(reciever.throbber)

        loop.start()
test_config.py 文件源码 项目:napalm-logs 作者: napalm-automation 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def startup_local_client():
    '''
    Startup a local ZMQ client to receive the published messages.
    '''
    time.sleep(2)
    global TEST_CLIENT
    context = zmq.Context()
    TEST_CLIENT = context.socket(zmq.SUB)
    TEST_CLIENT.connect('tcp://{addr}:{port}'.format(
        addr=NAPALM_LOGS_TEST_PUB_ADDR,
        port=NAPALM_LOGS_TEST_PUB_PORT)
    )
    TEST_CLIENT.setsockopt(zmq.SUBSCRIBE, b'')


# Startup the local ZMQ client.
core.py 文件源码 项目:agentzero 作者: gabrielfalcao 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def set_topic(self, name, topic):
        """shortcut to :py:meth:SocketManager.set_socket_option(zmq.TOPIC, topic)

        :param name: the name of the socket where data will pad through
        :param topic: the option from the ``zmq`` module

        **Example:**

        ::

          >>> import zmq
          >>> from agentzero.core import SocketManager
          >>>
          >>> sockets = SocketManager()
          >>> sockets.ensure_and_bind('events', zmq.SUB, 'tcp://*:6000', zmq.POLLIN)
          >>>
          >>> # subscribe only to topics beginning with "logs"
          >>> sockets.set_topic('events', 'logs')
          >>> event = sockets.recv_event_safe('events')
          >>> event.topic, event.data
          'logs:2016-06-20', {'stdout': 'hello world'}
        """

        safe_topic = bytes(topic)
        self.set_socket_option(name, self.zmq.SUBSCRIBE, safe_topic)
broadcast.py 文件源码 项目:DarkWallet 作者: DissentDifference 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def feedback_loop(self, *args):
        # feedback socket
        ctx = zmq.Context()
        socket = ctx.socket(zmq.SUB)
        socket.setsockopt(zmq.SUBSCRIBE, "")
        socket.connect(config.get("broadcaster-feedback-url", "tcp://localhost:9110"))
        print "brc feedback channel connected"
        while True:
            msg = [socket.recv()]
            while socket.getsockopt(zmq.RCVMORE):
                msg.append(socket.recv())
                print "feedback msg"
            if len(msg) == 3:
                self.on_feedback_msg(*msg)
            else:
                print "bad feedback message", len(msg)
broadcast.py 文件源码 项目:DarkWallet 作者: DissentDifference 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def status_loop(self, *args):
        # feedback socket
        print "connect brc feedback"
        ctx = zmq.Context()
        socket = ctx.socket(zmq.SUB)
        socket.setsockopt(zmq.SUBSCRIBE, "")
        socket.connect(config.get("broadcaster-feedback-url", "tcp://localhost:9112"))
        print "brc status channel connected"
        while True:
            msg = socket.recv()
            nodes = 0
            try:
                nodes = struct.unpack("<Q", msg)[0]
                self.last_status = time.time()
            except:
                print "bad nodes data", msg
            if not nodes == self.last_nodes:
                print "brc hosts", nodes
                self.last_nodes = nodes


问题


面经


文章

微信
公众号

扫码关注公众号