python类REQ的实例源码

hellozmq.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def frontendClient(context=None):
    #reuse context if it exists, otherwise make a new one
    context = context or zmq.Context.instance()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5559")
    socket.RCVTIMEO = 2000 #we will only wait 2s for a reply
    while True:
        #randomly request either service A or service B
        serviceRequest = random.choice([b'Service A',b'Service B'])
        with myLock:
            print "client wants %s" % serviceRequest
        socket.send(serviceRequest)
        try:
            reply = socket.recv()
        except Exception as e:
            print "client timed out"
            break
        if not reply:
            break
        with myLock:
            print "Client got reply: "
            print reply
            print
        #take a nap
        time.sleep(1)
test_socket.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_hwm(self):
        zmq3 = zmq.zmq_version_info()[0] >= 3
        for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
            s = self.context.socket(stype)
            s.hwm = 100
            self.assertEqual(s.hwm, 100)
            if zmq3:
                try:
                    self.assertEqual(s.sndhwm, 100)
                except AttributeError:
                    pass
                try:
                    self.assertEqual(s.rcvhwm, 100)
                except AttributeError:
                    pass
            s.close()
test_ioloop.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_poller_events(self):
        """Tornado poller implementation maps events correctly"""
        req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
        poller = ioloop.ZMQPoller()
        poller.register(req, ioloop.IOLoop.READ)
        poller.register(rep, ioloop.IOLoop.READ)
        events = dict(poller.poll(0))
        self.assertEqual(events.get(rep), None)
        self.assertEqual(events.get(req), None)

        poller.register(req, ioloop.IOLoop.WRITE)
        poller.register(rep, ioloop.IOLoop.WRITE)
        events = dict(poller.poll(1))
        self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
        self.assertEqual(events.get(rep), None)

        poller.register(rep, ioloop.IOLoop.READ)
        req.send(b'hi')
        events = dict(poller.poll(1))
        self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
        self.assertEqual(events.get(req), None)
test_monitor.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_monitor_connected(self):
        """Test connected monitoring socket."""
        s_rep = self.context.socket(zmq.REP)
        s_req = self.context.socket(zmq.REQ)
        self.sockets.extend([s_rep, s_req])
        s_req.bind("tcp://127.0.0.1:6667")
        # try monitoring the REP socket
        # create listening socket for monitor
        s_event = s_rep.get_monitor_socket()
        s_event.linger = 0
        self.sockets.append(s_event)
        # test receive event for connect event
        s_rep.connect("tcp://127.0.0.1:6667")
        m = recv_monitor_message(s_event)
        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
            # test receive event for connected event
            m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
controller_handler.py 文件源码 项目:lustre_task_driven_monitoring_framework 作者: GSI-HPC 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def connect(self):

        self.context = zmq.Context()

        if not self.context:
            raise RuntimeError('Failed to create ZMQ context!')

        self.socket = self.context.socket(zmq.REQ)

        if not self.socket:
            raise RuntimeError('Failed to create ZMQ socket!')

        self.socket.connect(self.endpoint)

        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN)

        self.is_connected = True
vnrpc.py 文件源码 项目:InplusTrader_Linux 作者: zhengwsh 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, reqAddress, subAddress):
        """Constructor"""
        super(RpcClient, self).__init__()

        # zmq????
        self.__reqAddress = reqAddress
        self.__subAddress = subAddress

        self.__context = zmq.Context()
        self.__socketREQ = self.__context.socket(zmq.REQ)   # ????socket
        self.__socketSUB = self.__context.socket(zmq.SUB)   # ????socket        

        # ???????????????????
        self.__active = False                                   # ????????
        self.__thread = threading.Thread(target=self.run)       # ????????

    #----------------------------------------------------------------------
client.py 文件源码 项目:justdb 作者: kootenpv 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self):
        # if not exist server, spawn server, try except around
        context = zmq.Context()

        # try to start server in background
        os.system("justdb serve &")

        main_socket = context.socket(zmq.REQ)
        main_socket.connect("tcp://localhost:5555")

        # print("Connecting to write server")
        freeze_socket = context.socket(zmq.REQ)
        freeze_socket.connect("tcp://localhost:6666")

        self.main_socket = main_socket
        self.freeze_socket = freeze_socket
base.py 文件源码 项目:antgo 作者: jianzfb 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def flash(self):
    if self.pid != str(os.getpid()):
      # reset process pid
      self.pid = str(os.getpid())

      # update zmq sockets
      # (couldnt share socket in differenet process)
      self.zmq_socket = zmq.Context().socket(zmq.REQ)
      self.zmq_file_socket = zmq.Context().socket(zmq.DEALER)

      # update context
      ctx = main_context(self.main_file, self.main_folder)
      if self.main_param is not None:
        main_config_path = os.path.join(self.main_folder, self.main_param)
        params = yaml.load(open(main_config_path, 'r'))
        ctx.params = params

      self.context = ctx
shell.py 文件源码 项目:networkzero 作者: tjguk 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://%s:%s" % (config.LISTEN_ON_IP, config.LISTEN_ON_PORT))

    while True:
        command = input("Command: ")
        socket.send(command.encode(config.CODEC))
        response = socket.recv().decode(config.CODEC)
        print("  ... %s" % response)
        words = shlex.split(response.lower())
        status = words[0]
        if len(words) > 1:
            info = words[1:]
        if status == "finished":
            print("Finished status received from robot")
            break
controller.py 文件源码 项目:bqueryd 作者: visualfabriq 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def handle_in(self):
        self.msg_count_in += 1
        data = self.socket.recv_multipart()
        binary, sender = None, None # initialise outside for edge cases
        if len(data) == 3:
            if data[1] == '': # This is a RPC call from a zmq.REQ socket
                sender, _blank, msg_buf = data
                self.handle_rpc(sender, msg_factory(msg_buf))
                return                
            sender, msg_buf, binary = data
        elif len(data) == 2:  # This is an internode call from another zmq.ROUTER, a Controller or Worker
            sender, msg_buf = data
        msg = msg_factory(msg_buf)
        if binary:
            msg['data'] = binary
        if sender in self.others:
            self.handle_peer(sender, msg)
        else:
            self.handle_worker(sender, msg)
test_interoperability.py 文件源码 项目:azmq 作者: ereOn 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_tcp_req_socket(event_loop, socket_factory, connect_or_bind):
    rep_socket = socket_factory.create(zmq.REP)
    connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        frames = rep_socket.recv_multipart()
        assert frames == [b'my', b'question']
        rep_socket.send_multipart([b'your', b'answer'])

    with run_in_background(run):
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.REQ)
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')
            await asyncio.wait_for(
                socket.send_multipart([b'my', b'question']),
                1,
            )
            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'your', b'answer']
test_interoperability.py 文件源码 项目:azmq 作者: ereOn 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_tcp_rep_socket(event_loop, socket_factory, connect_or_bind):
    req_socket = socket_factory.create(zmq.REQ)
    connect_or_bind(req_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        req_socket.send_multipart([b'my', b'question'])
        frames = req_socket.recv_multipart()
        assert frames == [b'your', b'answer']

    with run_in_background(run):
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.REP)
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')
            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'my', b'question']
            await asyncio.wait_for(
                socket.send_multipart([b'your', b'answer']),
                1,
            )
test_interoperability.py 文件源码 项目:azmq 作者: ereOn 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_tcp_router_socket(event_loop, socket_factory, connect_or_bind):
    req_socket = socket_factory.create(zmq.REQ)
    req_socket.identity = b'abcd'
    connect_or_bind(req_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        req_socket.send_multipart([b'my', b'question'])
        frames = req_socket.recv_multipart()
        assert frames == [b'your', b'answer']

    with run_in_background(run):
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.ROUTER)
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')
            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            identity = frames.pop(0)
            assert identity == req_socket.identity
            assert frames == [b'', b'my', b'question']
            await asyncio.wait_for(
                socket.send_multipart([identity, b'', b'your', b'answer']),
                1,
            )
test_interoperability.py 文件源码 项目:azmq 作者: ereOn 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_tcp_big_messages(event_loop, socket_factory, connect_or_bind):
    rep_socket = socket_factory.create(zmq.REP)
    connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        frames = rep_socket.recv_multipart()
        assert frames == [b'1' * 500, b'2' * 100000]
        rep_socket.send_multipart([b'3' * 500, b'4' * 100000])

    with run_in_background(run):
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.REQ)
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')
            await asyncio.wait_for(
                socket.send_multipart([b'1' * 500, b'2' * 100000]),
                1,
            )
            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'3' * 500, b'4' * 100000]
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_hwm(self):
        zmq3 = zmq.zmq_version_info()[0] >= 3
        for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
            s = self.context.socket(stype)
            s.hwm = 100
            self.assertEqual(s.hwm, 100)
            if zmq3:
                try:
                    self.assertEqual(s.sndhwm, 100)
                except AttributeError:
                    pass
                try:
                    self.assertEqual(s.rcvhwm, 100)
                except AttributeError:
                    pass
            s.close()
test_ioloop.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_poller_events(self):
        """Tornado poller implementation maps events correctly"""
        req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
        poller = ioloop.ZMQPoller()
        poller.register(req, ioloop.IOLoop.READ)
        poller.register(rep, ioloop.IOLoop.READ)
        events = dict(poller.poll(0))
        self.assertEqual(events.get(rep), None)
        self.assertEqual(events.get(req), None)

        poller.register(req, ioloop.IOLoop.WRITE)
        poller.register(rep, ioloop.IOLoop.WRITE)
        events = dict(poller.poll(1))
        self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
        self.assertEqual(events.get(rep), None)

        poller.register(rep, ioloop.IOLoop.READ)
        req.send(b'hi')
        events = dict(poller.poll(1))
        self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
        self.assertEqual(events.get(req), None)
test_monitor.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_monitor_connected(self):
        """Test connected monitoring socket."""
        s_rep = self.context.socket(zmq.REP)
        s_req = self.context.socket(zmq.REQ)
        self.sockets.extend([s_rep, s_req])
        s_req.bind("tcp://127.0.0.1:6667")
        # try monitoring the REP socket
        # create listening socket for monitor
        s_event = s_rep.get_monitor_socket()
        s_event.linger = 0
        self.sockets.append(s_event)
        # test receive event for connect event
        s_rep.connect("tcp://127.0.0.1:6667")
        m = recv_monitor_message(s_event)
        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
            # test receive event for connected event
            m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
test_device.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_single_socket_forwarder_connect(self):
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_in('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_out('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello again'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_hwm(self):
        zmq3 = zmq.zmq_version_info()[0] >= 3
        for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
            s = self.context.socket(stype)
            s.hwm = 100
            self.assertEqual(s.hwm, 100)
            if zmq3:
                try:
                    self.assertEqual(s.sndhwm, 100)
                except AttributeError:
                    pass
                try:
                    self.assertEqual(s.rcvhwm, 100)
                except AttributeError:
                    pass
            s.close()
test_ioloop.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_poller_events(self):
        """Tornado poller implementation maps events correctly"""
        req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
        poller = ioloop.ZMQPoller()
        poller.register(req, ioloop.IOLoop.READ)
        poller.register(rep, ioloop.IOLoop.READ)
        events = dict(poller.poll(0))
        self.assertEqual(events.get(rep), None)
        self.assertEqual(events.get(req), None)

        poller.register(req, ioloop.IOLoop.WRITE)
        poller.register(rep, ioloop.IOLoop.WRITE)
        events = dict(poller.poll(1))
        self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
        self.assertEqual(events.get(rep), None)

        poller.register(rep, ioloop.IOLoop.READ)
        req.send(b'hi')
        events = dict(poller.poll(1))
        self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
        self.assertEqual(events.get(req), None)
test_monitor.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_monitor_connected(self):
        """Test connected monitoring socket."""
        s_rep = self.context.socket(zmq.REP)
        s_req = self.context.socket(zmq.REQ)
        self.sockets.extend([s_rep, s_req])
        s_req.bind("tcp://127.0.0.1:6667")
        # try monitoring the REP socket
        # create listening socket for monitor
        s_event = s_rep.get_monitor_socket()
        s_event.linger = 0
        self.sockets.append(s_event)
        # test receive event for connect event
        s_rep.connect("tcp://127.0.0.1:6667")
        m = recv_monitor_message(s_event)
        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
            # test receive event for connected event
            m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
test_device.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_single_socket_forwarder_connect(self):
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_in('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_out('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello again'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_hwm(self):
        zmq3 = zmq.zmq_version_info()[0] >= 3
        for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
            s = self.context.socket(stype)
            s.hwm = 100
            self.assertEqual(s.hwm, 100)
            if zmq3:
                try:
                    self.assertEqual(s.sndhwm, 100)
                except AttributeError:
                    pass
                try:
                    self.assertEqual(s.rcvhwm, 100)
                except AttributeError:
                    pass
            s.close()
test_ioloop.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_poller_events(self):
        """Tornado poller implementation maps events correctly"""
        req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
        poller = ioloop.ZMQPoller()
        poller.register(req, ioloop.IOLoop.READ)
        poller.register(rep, ioloop.IOLoop.READ)
        events = dict(poller.poll(0))
        self.assertEqual(events.get(rep), None)
        self.assertEqual(events.get(req), None)

        poller.register(req, ioloop.IOLoop.WRITE)
        poller.register(rep, ioloop.IOLoop.WRITE)
        events = dict(poller.poll(1))
        self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
        self.assertEqual(events.get(rep), None)

        poller.register(rep, ioloop.IOLoop.READ)
        req.send(b'hi')
        events = dict(poller.poll(1))
        self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
        self.assertEqual(events.get(req), None)
test_monitor.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_monitor_connected(self):
        """Test connected monitoring socket."""
        s_rep = self.context.socket(zmq.REP)
        s_req = self.context.socket(zmq.REQ)
        self.sockets.extend([s_rep, s_req])
        s_req.bind("tcp://127.0.0.1:6667")
        # try monitoring the REP socket
        # create listening socket for monitor
        s_event = s_rep.get_monitor_socket()
        s_event.linger = 0
        self.sockets.append(s_event)
        # test receive event for connect event
        s_rep.connect("tcp://127.0.0.1:6667")
        m = recv_monitor_message(s_event)
        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
            # test receive event for connected event
            m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
test_device.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_single_socket_forwarder_connect(self):
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_in('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_out('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello again'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
test_socket.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_hwm(self):
        zmq3 = zmq.zmq_version_info()[0] >= 3
        for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
            s = self.context.socket(stype)
            s.hwm = 100
            self.assertEqual(s.hwm, 100)
            if zmq3:
                try:
                    self.assertEqual(s.sndhwm, 100)
                except AttributeError:
                    pass
                try:
                    self.assertEqual(s.rcvhwm, 100)
                except AttributeError:
                    pass
            s.close()
test_ioloop.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_poller_events(self):
        """Tornado poller implementation maps events correctly"""
        req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
        poller = ioloop.ZMQPoller()
        poller.register(req, ioloop.IOLoop.READ)
        poller.register(rep, ioloop.IOLoop.READ)
        events = dict(poller.poll(0))
        self.assertEqual(events.get(rep), None)
        self.assertEqual(events.get(req), None)

        poller.register(req, ioloop.IOLoop.WRITE)
        poller.register(rep, ioloop.IOLoop.WRITE)
        events = dict(poller.poll(1))
        self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
        self.assertEqual(events.get(rep), None)

        poller.register(rep, ioloop.IOLoop.READ)
        req.send(b'hi')
        events = dict(poller.poll(1))
        self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
        self.assertEqual(events.get(req), None)
test_monitor.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_monitor_connected(self):
        """Test connected monitoring socket."""
        s_rep = self.context.socket(zmq.REP)
        s_req = self.context.socket(zmq.REQ)
        self.sockets.extend([s_rep, s_req])
        s_req.bind("tcp://127.0.0.1:6667")
        # try monitoring the REP socket
        # create listening socket for monitor
        s_event = s_rep.get_monitor_socket()
        s_event.linger = 0
        self.sockets.append(s_event)
        # test receive event for connect event
        s_rep.connect("tcp://127.0.0.1:6667")
        m = recv_monitor_message(s_event)
        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
            # test receive event for connected event
            m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
test_device.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_single_socket_forwarder_connect(self):
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_in('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_out('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello again'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()


问题


面经


文章

微信
公众号

扫码关注公众号