python类SUBSCRIBE的实例源码

plotter.py 文件源码 项目:live-plotter 作者: anandtrex 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def run(self):
        """
        Entry point for the live plotting when started as a separate process. This starts the loop
        """
        self.entity_name = current_process().name
        plogger.info("Starting new thread %s", self.entity_name)

        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.SUB)

        self.socket.connect("tcp://localhost:%d" % self.port)
        topic = pickle.dumps(self.var_name, protocol=pickle.HIGHEST_PROTOCOL)

        self.socket.setsockopt(zmq.SUBSCRIBE, topic)
        plogger.info("Subscribed to topic %s on port %d", self.var_name, self.port)

        self.init(**self.init_kwargs)
        # Reference to animation required so that GC doesn't clean it up.
        # WILL NOT work if you remove it!!!!!
        # See: http://matplotlib.org/api/animation_api.html
        ani = animation.FuncAnimation(self.fig, self.loop, interval=100)
        self.plt.show()
zmq_rec.py 文件源码 项目:odr-stream-router 作者: digris 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def rec(port):

    zmq_ctx = zmq.Context()

    s = zmq_ctx.socket(zmq.SUB)
    s.bind('tcp://*:{port}'.format(port=port))
    s.setsockopt(zmq.SUBSCRIBE, b"")


    stream = ZMQStream(s)

    stream.on_recv_stream(rec_frame)

    ioloop.IOLoop.instance().start()

    while True:
        pass
test_socket.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
test_log.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_init_iface(self):
        logger = self.logger
        ctx = self.context
        handler = handlers.PUBHandler(self.iface)
        self.assertFalse(handler.ctx is ctx)
        self.sockets.append(handler.socket)
        # handler.ctx.term()
        handler = handlers.PUBHandler(self.iface, self.context)
        self.sockets.append(handler.socket)
        self.assertTrue(handler.ctx is ctx)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)
        sub = ctx.socket(zmq.SUB)
        self.sockets.append(sub)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        sub.connect(self.iface)
        import time; time.sleep(0.25)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
test_log.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
test_log.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_root_topic(self):
        logger, handler, sub = self.connect_handler()
        handler.socket.bind(self.iface)
        sub2 = sub.context.socket(zmq.SUB)
        self.sockets.append(sub2)
        sub2.connect(self.iface)
        sub2.setsockopt(zmq.SUBSCRIBE, b'')
        handler.root_topic = b'twoonly'
        msg1 = 'ignored'
        logger.info(msg1)
        self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
        topic,msg2 = sub2.recv_multipart()
        self.assertEqual(topic, b'twoonly.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')

        logger.removeHandler(handler)
test_monqueue.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
agent.py 文件源码 项目:osbrain 作者: opensistemas-hub 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _subscribe_to_topic(self, alias: str, topic: Union[bytes, str]):
        '''
        Do the actual ZeroMQ subscription of a socket given by its alias to
        a specific topic. This method only makes sense to be called on
        SUB/SYNC_SUB sockets.

        Note that the handler is not set within this function.
        '''
        topic = topic_to_bytes(topic)

        if isinstance(self.address[alias], AgentAddress):
            self.socket[alias].setsockopt(zmq.SUBSCRIBE, topic)
        elif isinstance(self.address[alias], AgentChannel):
            channel = self.address[alias]
            sub_address = channel.receiver
            treated_topic = channel.uuid + topic
            self.socket[sub_address].setsockopt(zmq.SUBSCRIBE, treated_topic)
        else:
            raise NotImplementedError('Unsupported address type %s!' %
                                      self.address[alias])
__init__.py 文件源码 项目:Chasar 作者: camilochs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create_socket(port):
    """
    Create zmq sub socket.
    """
    context = zmq.Context()
    socket = context.socket(zmq.SUB)

    try:
        socket.bind("tcp://*:%s" % port)
    except zmq.error.ZMQError:
        print("Address already in use")
        sys.exit(1)

    socket.setsockopt(zmq.SUBSCRIBE, b"")
    print("Start node-masternode Subscribe")
    return socket, context
__init__.py 文件源码 项目:Chasar 作者: camilochs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def create_socket(port):
    """
    Create zmq sub socket.
    """
    context = zmq.Context()
    socket = context.socket(zmq.SUB)

    try:
        socket.bind("tcp://*:%s" % port)
    except zmq.error.ZMQError:
        print("Address already in use")
        sys.exit(1)

    socket.setsockopt(zmq.SUBSCRIBE, b"")
    print("Start node-masternode Subscribe")
    return socket, context
processing.py 文件源码 项目:simple-processing 作者: bitaps-com 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def __init__(self, loop, logger, config):
        print("test")
        self.loop = loop
        self.log = logger
        self.config = config
        self.zmq_url = config["BITCOIND"]["zeromq"]
        self.zmqContext = zmq.asyncio.Context()
        self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
        self.MYSQL_CONFIG = config["MYSQL"]
        self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
        # self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
        # self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
        # self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
        self.zmqSubSocket.connect(self.zmq_url)
        print(self.zmq_url)
        self.loop.create_task(self.init_db())
        self.loop.create_task(self.handle())
        self.loop.create_task(self.rpctest())
        # self.loop.create_task(self.mysqltest())
sockets.py 文件源码 项目:networkzero 作者: tjguk 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def wait_for_news_from(self, address, topic, wait_for_s):
        if isinstance(address, list):
            addresses = address
        else:
            addresses = [address]
        socket = self.get_socket(addresses, "subscriber")
        if isinstance(topic, str):
            topics = [topic]
        else:
            topics = topic
        for t in topics:
            socket.set(zmq.SUBSCRIBE, t.encode(config.ENCODING))        
        try:
            result = self._receive_with_timeout(socket, wait_for_s, use_multipart=True)
            unserialised_result = _unserialise_for_pubsub(result)
            return unserialised_result
        except (core.SocketTimedOutError, core.SocketInterruptedError):
            return None, None
authers.py 文件源码 项目:enteletaor 作者: cr0hn 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def brute_zmq(host, port=5555, user=None, password=None, db=0):

    context = zmq.Context()

    # Configure
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.SUBSCRIBE, b"")  # All topics
    socket.setsockopt(zmq.LINGER, 0)  # All topics
    socket.RCVTIMEO = 1000  # timeout: 1 sec

    # Connect
    socket.connect("tcp://%s:%s" % (host, port))

    # Try to receive
    try:
        socket.recv()

        return True
    except Exception:
        return False
    finally:
        socket.close()
scan_main.py 文件源码 项目:enteletaor 作者: cr0hn 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def handle_zmq(host, port=5555, extra_config=None):

    # log.debug("      * Connection to ZeroMQ: %s : %s" % (host, port))

    context = zmq.Context()

    # Configure
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.SUBSCRIBE, b"")  # All topics
    socket.setsockopt(zmq.LINGER, 0)  # All topics
    socket.RCVTIMEO = 1000  # timeout: 1 sec

    # Connect
    socket.connect("tcp://%s:%s" % (host, port))

    # Try to receive
    try:
        socket.recv()

        return True
    except Exception:
        return False
    finally:
        socket.close()
test_interoperability.py 文件源码 项目:azmq 作者: ereOn 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_tcp_pub_socket(event_loop, socket_factory, connect_or_bind):
    sub_socket = socket_factory.create(zmq.SUB)
    sub_socket.setsockopt(zmq.SUBSCRIBE, b'a')
    connect_or_bind(sub_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        frames = sub_socket.recv_multipart()
        assert frames == [b'a', b'message']

    with run_in_background(run) as thread_done_event:
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.PUB)
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')

            while not thread_done_event.is_set():
                await socket.send_multipart([b'a', b'message'])
                await socket.send_multipart([b'b', b'wrong'])
test_interoperability.py 文件源码 项目:azmq 作者: ereOn 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_tcp_xpub_socket(event_loop, socket_factory, connect_or_bind):
    sub_socket = socket_factory.create(zmq.SUB)
    sub_socket.setsockopt(zmq.SUBSCRIBE, b'a')
    connect_or_bind(sub_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        frames = sub_socket.recv_multipart()
        assert frames == [b'a', b'message']

    with run_in_background(run) as thread_done_event:
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.XPUB)
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')

            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'\1a']

            while not thread_done_event.is_set():
                await socket.send_multipart([b'a', b'message'])
                await socket.send_multipart([b'b', b'wrong'])

            sub_socket.close()
            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'\0a']
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_init_iface(self):
        logger = self.logger
        ctx = self.context
        handler = handlers.PUBHandler(self.iface)
        self.assertFalse(handler.ctx is ctx)
        self.sockets.append(handler.socket)
        # handler.ctx.term()
        handler = handlers.PUBHandler(self.iface, self.context)
        self.sockets.append(handler.socket)
        self.assertTrue(handler.ctx is ctx)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)
        sub = ctx.socket(zmq.SUB)
        self.sockets.append(sub)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        sub.connect(self.iface)
        import time; time.sleep(0.25)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_root_topic(self):
        logger, handler, sub = self.connect_handler()
        handler.socket.bind(self.iface)
        sub2 = sub.context.socket(zmq.SUB)
        self.sockets.append(sub2)
        sub2.connect(self.iface)
        sub2.setsockopt(zmq.SUBSCRIBE, b'')
        handler.root_topic = b'twoonly'
        msg1 = 'ignored'
        logger.info(msg1)
        self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
        topic,msg2 = sub2.recv_multipart()
        self.assertEqual(topic, b'twoonly.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')

        logger.removeHandler(handler)
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_root_topic(self):
        logger, handler, sub = self.connect_handler()
        handler.socket.bind(self.iface)
        sub2 = sub.context.socket(zmq.SUB)
        self.sockets.append(sub2)
        sub2.connect(self.iface)
        sub2.setsockopt(zmq.SUBSCRIBE, b'')
        handler.root_topic = b'twoonly'
        msg1 = 'ignored'
        logger.info(msg1)
        self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
        topic,msg2 = sub2.recv_multipart()
        self.assertEqual(topic, b'twoonly.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')

        logger.removeHandler(handler)
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_init_iface(self):
        logger = self.logger
        ctx = self.context
        handler = handlers.PUBHandler(self.iface)
        self.assertFalse(handler.ctx is ctx)
        self.sockets.append(handler.socket)
        # handler.ctx.term()
        handler = handlers.PUBHandler(self.iface, self.context)
        self.sockets.append(handler.socket)
        self.assertTrue(handler.ctx is ctx)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)
        sub = ctx.socket(zmq.SUB)
        self.sockets.append(sub)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        sub.connect(self.iface)
        import time; time.sleep(0.25)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_root_topic(self):
        logger, handler, sub = self.connect_handler()
        handler.socket.bind(self.iface)
        sub2 = sub.context.socket(zmq.SUB)
        self.sockets.append(sub2)
        sub2.connect(self.iface)
        sub2.setsockopt(zmq.SUBSCRIBE, b'')
        handler.root_topic = b'twoonly'
        msg1 = 'ignored'
        logger.info(msg1)
        self.assertRaisesErrno(zmq.EAGAIN, sub.recv, zmq.NOBLOCK)
        topic,msg2 = sub2.recv_multipart()
        self.assertEqual(topic, b'twoonly.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')

        logger.removeHandler(handler)
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_init_iface(self):
        logger = self.logger
        ctx = self.context
        handler = handlers.PUBHandler(self.iface)
        self.assertFalse(handler.ctx is ctx)
        self.sockets.append(handler.socket)
        # handler.ctx.term()
        handler = handlers.PUBHandler(self.iface, self.context)
        self.sockets.append(handler.socket)
        self.assertTrue(handler.ctx is ctx)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)
        sub = ctx.socket(zmq.SUB)
        self.sockets.append(sub)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        sub.connect(self.iface)
        import time; time.sleep(0.25)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)


问题


面经


文章

微信
公众号

扫码关注公众号