python类PUB的实例源码

reactor.py 文件源码 项目:eventdriventalk 作者: cachedout 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, opts=None):
        if opts is None:
            self.opts = self.process_config(CONFIG_LOCATION)
        else:
            self.opts = opts

        self.ctx = zmq.Context()
        self.pub_socket = self.ctx.socket(zmq.PUB)

        self.pub_socket.bind('tcp://127.0.0.1:2000')

        self.loop = zmq.eventloop.IOLoop.instance()
        self.pub_stream = zmq.eventloop.zmqstream.ZMQStream(self.pub_socket, self.loop)

        # Now create PULL socket over IPC to listen to reactor

        self.pull_socket = self.ctx.socket(zmq.PULL)
        self.pull_socket.bind('ipc:///tmp/reactor.ipc')
        self.pull_stream = zmq.eventloop.zmqstream.ZMQStream(self.pull_socket, self.loop)

        self.pull_stream.on_recv(self.republish)
test_socket.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
test_socket.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_hwm(self):
        zmq3 = zmq.zmq_version_info()[0] >= 3
        for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
            s = self.context.socket(stype)
            s.hwm = 100
            self.assertEqual(s.hwm, 100)
            if zmq3:
                try:
                    self.assertEqual(s.sndhwm, 100)
                except AttributeError:
                    pass
                try:
                    self.assertEqual(s.rcvhwm, 100)
                except AttributeError:
                    pass
            s.close()
test_context.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_shadow(self):
        ctx = self.Context()
        ctx2 = self.Context.shadow(ctx.underlying)
        self.assertEqual(ctx.underlying, ctx2.underlying)
        s = ctx.socket(zmq.PUB)
        s.close()
        del ctx2
        self.assertFalse(ctx.closed)
        s = ctx.socket(zmq.PUB)
        ctx2 = self.Context.shadow(ctx.underlying)
        s2 = ctx2.socket(zmq.PUB)
        s.close()
        s2.close()
        ctx.term()
        self.assertRaisesErrno(zmq.EFAULT, ctx2.socket, zmq.PUB)
        del ctx2
test_monqueue.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
agent.py 文件源码 项目:osbrain 作者: opensistemas-hub 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def compose_message(message: bytes, topic: bytes,
                    serializer: AgentAddressSerializer) -> bytes:
    """
    Compose a message and leave it ready to be sent through a socket.

    This is used in PUB-SUB patterns to combine the topic and the message
    in a single bytes buffer.

    Parameters
    ----------
    message
        Message to be composed.
    topic
        Topic to combine the message with.
    serializer
        Serialization for the message part.

    Returns
    -------
        The bytes representation of the final message to be sent.
    """
    if serializer.requires_separator:
        return topic + TOPIC_SEPARATOR + message
    return topic + message
test_address.py 文件源码 项目:osbrain 作者: opensistemas-hub 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_agentchannel_sync_pub():
    """
    Test basic SYNC_PUB AgentChannel operations: initialization, equivalence
    and basic methods.
    """
    sender = AgentAddress('ipc', 'addr0', 'PUB', 'server', 'pickle')
    receiver = AgentAddress('ipc', 'addr0', 'PULL', 'server', 'pickle')
    channel = AgentChannel('SYNC_PUB', sender=sender, receiver=receiver)
    # Equivalence
    assert channel == AgentChannel('SYNC_PUB', sender=sender,
                                   receiver=receiver)
    assert not channel == 'foo'
    assert channel != 'foo'
    # Basic methods
    assert channel.twin() == AgentChannel('SYNC_SUB', sender=receiver.twin(),
                                          receiver=sender.twin())
    # Other attributes
    assert hasattr(channel, 'uuid')
    assert channel.transport == 'ipc'
    assert channel.serializer == 'pickle'
zzmq.py 文件源码 项目:csirtg-smrt-py 作者: csirtgadgets 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def indicators_create(self, data):
        if isinstance(data, dict):
            data = Indicator(**data)

        if isinstance(data, Indicator):
            data = [data]

        if self.socket_type == "PUB":
            for i in data:
                self.socket.send_multipart([self.topic, str(i)])
            return

        if self.socket_type == 'PUSH_ZYRE_GATEWAY':
            for i in data:
                self.socket.send_multipart(['PUB', self.topic, str(i)])
            return

        for i in data:
            self.socket.send(str(i))
vnrpc.py 文件源码 项目:InplusTrader_Linux 作者: zhengwsh 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, repAddress, pubAddress):
        """Constructor"""
        super(RpcServer, self).__init__()

        # ??????????key?????value?????
        self.__functions = {}     

        # zmq????
        self.__context = zmq.Context()

        self.__socketREP = self.__context.socket(zmq.REP)   # ????socket
        self.__socketREP.bind(repAddress)

        self.__socketPUB = self.__context.socket(zmq.PUB)   # ????socket
        self.__socketPUB.bind(pubAddress)

        # ??????
        self.__active = False                             # ????????
        self.__thread = threading.Thread(target=self.run) # ????????

    #----------------------------------------------------------------------
plotting.py 文件源码 项目:Auspex 作者: BBN-Q 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run(self):
            self._loop = zmq.asyncio.ZMQEventLoop()
            asyncio.set_event_loop(self._loop)
            self.context = zmq.asyncio.Context()
            self.status_sock = self.context.socket(zmq.ROUTER)
            self.data_sock = self.context.socket(zmq.PUB)
            self.status_sock.bind("tcp://*:%s" % self.status_port)
            self.data_sock.bind("tcp://*:%s" % self.data_port)
            self.poller = zmq.asyncio.Poller()
            self.poller.register(self.status_sock, zmq.POLLIN)

            self._loop.create_task(self.poll_sockets())
            try:
                self._loop.run_forever()
            finally:
                self.status_sock.close()
                self.data_sock.close()
                self.context.destroy()
test_zmq_pub_sub.py 文件源码 项目:integration-prototype 作者: SKA-ScienceDataProcessor 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_pub(self):
        """Publish log messages. bind() to PUB socket."""
        # pylint: disable=E1101
        context = zmq.Context()
        pub = context.socket(zmq.PUB)
        try:
            pub.bind('tcp://*:{}'.format(self.sub_port))
        except zmq.ZMQError as error:
            print(error)
        time.sleep(0.1)

        send_count = self.send_count
        for i in range(send_count):
            pub.send_string('hi there {}'.format(i))
            time.sleep(1e-5)
        sys.stdout.flush()

        # Wait for the watcher thread to exit.
        while self.watcher.isAlive():
            self.watcher.join(timeout=1e-5)

        pub.close()
        context.term()
test_zmq_pub_sub.py 文件源码 项目:integration-prototype 作者: SKA-ScienceDataProcessor 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_pub(self):
        """Publish log messages. connect() to PUB socket."""
        # pylint: disable=E1101
        context = zmq.Context()
        pub = context.socket(zmq.PUB)
        try:
            _address = 'tcp://{}:{}'.format(self.sub_host, self.sub_port)
            pub.connect(_address)
        except zmq.ZMQError as error:
            print('ERROR:', error)
        time.sleep(0.1)

        send_count = self.send_count
        for i in range(send_count):
            pub.send_string('hi there {}'.format(i))
            time.sleep(1e-5)

        # Wait for the watcher thread to exit
        while self.watcher.isAlive():
            self.watcher.join(timeout=1e-5)

        pub.close()
        context.term()
logging_handlers.py 文件源码 项目:integration-prototype 作者: SKA-ScienceDataProcessor 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def to(cls, channel, host='127.0.0.1',
           port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
           level=logging.NOTSET):
        """Convenience class method to create a ZmqLoghandler and
        connect to a ZMQ subscriber.

        Args:
            channel (string): Logging channel name. This is used to build a
                              ZMQ topic.
            host (string): Hostname / ip address of the subscriber to publish
                           to.
            port (int, string): Port on which to publish messages.
            level (int): Logging level
        """
        context = zmq.Context()
        publisher = context.socket(zmq.PUB)
        address = 'tcp://{}:{}'.format(host, port)
        publisher.connect(address)
        time.sleep(0.1)  # This sleep hopefully fixes the silent joiner problem.
        return cls(channel, publisher, level=level)
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_hwm(self):
        zmq3 = zmq.zmq_version_info()[0] >= 3
        for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
            s = self.context.socket(stype)
            s.hwm = 100
            self.assertEqual(s.hwm, 100)
            if zmq3:
                try:
                    self.assertEqual(s.sndhwm, 100)
                except AttributeError:
                    pass
                try:
                    self.assertEqual(s.rcvhwm, 100)
                except AttributeError:
                    pass
            s.close()
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_hwm(self):
        zmq3 = zmq.zmq_version_info()[0] >= 3
        for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
            s = self.context.socket(stype)
            s.hwm = 100
            self.assertEqual(s.hwm, 100)
            if zmq3:
                try:
                    self.assertEqual(s.sndhwm, 100)
                except AttributeError:
                    pass
                try:
                    self.assertEqual(s.rcvhwm, 100)
                except AttributeError:
                    pass
            s.close()
test_context.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_shadow(self):
        ctx = self.Context()
        ctx2 = self.Context.shadow(ctx.underlying)
        self.assertEqual(ctx.underlying, ctx2.underlying)
        s = ctx.socket(zmq.PUB)
        s.close()
        del ctx2
        self.assertFalse(ctx.closed)
        s = ctx.socket(zmq.PUB)
        ctx2 = self.Context.shadow(ctx.underlying)
        s2 = ctx2.socket(zmq.PUB)
        s.close()
        s2.close()
        ctx.term()
        self.assertRaisesErrno(zmq.EFAULT, ctx2.socket, zmq.PUB)
        del ctx2
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_unicode_sockopts(self):
        """test setting/getting sockopts with unicode strings"""
        topic = "tést"
        if str is not unicode:
            topic = topic.decode('utf8')
        p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
        self.assertEqual(s.send_unicode, s.send_unicode)
        self.assertEqual(p.recv_unicode, p.recv_unicode)
        self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
        s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
        self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
        s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
        self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
        self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)

        identb = s.getsockopt(zmq.IDENTITY)
        identu = identb.decode('utf16')
        identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
        self.assertEqual(identu, identu2)
        time.sleep(0.1) # wait for connection/subscription
        p.send_unicode(topic,zmq.SNDMORE)
        p.send_unicode(topic*2, encoding='latin-1')
        self.assertEqual(topic, s.recv_unicode())
        self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_hwm(self):
        zmq3 = zmq.zmq_version_info()[0] >= 3
        for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
            s = self.context.socket(stype)
            s.hwm = 100
            self.assertEqual(s.hwm, 100)
            if zmq3:
                try:
                    self.assertEqual(s.sndhwm, 100)
                except AttributeError:
                    pass
                try:
                    self.assertEqual(s.rcvhwm, 100)
                except AttributeError:
                    pass
            s.close()
test_context.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_shadow(self):
        ctx = self.Context()
        ctx2 = self.Context.shadow(ctx.underlying)
        self.assertEqual(ctx.underlying, ctx2.underlying)
        s = ctx.socket(zmq.PUB)
        s.close()
        del ctx2
        self.assertFalse(ctx.closed)
        s = ctx.socket(zmq.PUB)
        ctx2 = self.Context.shadow(ctx.underlying)
        s2 = ctx2.socket(zmq.PUB)
        s.close()
        s2.close()
        ctx.term()
        self.assertRaisesErrno(zmq.EFAULT, ctx2.socket, zmq.PUB)
        del ctx2
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 85 收藏 0 点赞 0 评论 0
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)
test_monqueue.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
        self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
                                            in_prefix, out_prefix)
        alice = self.context.socket(zmq.PAIR)
        bob = self.context.socket(zmq.PAIR)
        mon = self.context.socket(zmq.SUB)

        aport = alice.bind_to_random_port('tcp://127.0.0.1')
        bport = bob.bind_to_random_port('tcp://127.0.0.1')
        mport = mon.bind_to_random_port('tcp://127.0.0.1')
        mon.setsockopt(zmq.SUBSCRIBE, mon_sub)

        self.device.connect_in("tcp://127.0.0.1:%i"%aport)
        self.device.connect_out("tcp://127.0.0.1:%i"%bport)
        self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
        self.device.start()
        time.sleep(.2)
        try:
            # this is currenlty necessary to ensure no dropped monitor messages
            # see LIBZMQ-248 for more info
            mon.recv_multipart(zmq.NOBLOCK)
        except zmq.ZMQError:
            pass
        self.sockets.extend([alice, bob, mon])
        return alice, bob, mon
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_hwm(self):
        zmq3 = zmq.zmq_version_info()[0] >= 3
        for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
            s = self.context.socket(stype)
            s.hwm = 100
            self.assertEqual(s.hwm, 100)
            if zmq3:
                try:
                    self.assertEqual(s.sndhwm, 100)
                except AttributeError:
                    pass
                try:
                    self.assertEqual(s.rcvhwm, 100)
                except AttributeError:
                    pass
            s.close()
test_context.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_shadow(self):
        ctx = self.Context()
        ctx2 = self.Context.shadow(ctx.underlying)
        self.assertEqual(ctx.underlying, ctx2.underlying)
        s = ctx.socket(zmq.PUB)
        s.close()
        del ctx2
        self.assertFalse(ctx.closed)
        s = ctx.socket(zmq.PUB)
        ctx2 = self.Context.shadow(ctx.underlying)
        s2 = ctx2.socket(zmq.PUB)
        s.close()
        s2.close()
        ctx.term()
        self.assertRaisesErrno(zmq.EFAULT, ctx2.socket, zmq.PUB)
        del ctx2
test_log.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_init_socket(self):
        pub,sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
        logger = self.logger
        handler = handlers.PUBHandler(pub)
        handler.setLevel(logging.DEBUG)
        handler.root_topic = self.topic
        logger.addHandler(handler)

        self.assertTrue(handler.socket is pub)
        self.assertTrue(handler.ctx is pub.context)
        self.assertTrue(handler.ctx is self.context)
        sub.setsockopt(zmq.SUBSCRIBE, b(self.topic))
        import time; time.sleep(0.1)
        msg1 = 'message'
        logger.info(msg1)

        (topic, msg2) = sub.recv_multipart()
        self.assertEqual(topic, b'zmq.INFO')
        self.assertEqual(msg2, b(msg1)+b'\n')
        logger.removeHandler(handler)


问题


面经


文章

微信
公众号

扫码关注公众号