python类POLLIN的实例源码

MessageController.py 文件源码 项目:py-enarksh 作者: SetBased 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def receive_message(self, event, event_data, listener_data):
        """
        Receives a messages from another processes.

        :param * event: Not used.
        :param * event_data: Not used.
        :param * listener_data: Not used.
        """
        del event, event_data, listener_data

        # Make a poller for all incoming sockets.
        poller = zmq.Poller()
        for socket in self.__end_points.values():
            if socket.type in [zmq.PULL, zmq.REP]:
                poller.register(socket, zmq.POLLIN)

        # Wait for socket is ready for reading.
        socks = dict(poller.poll())

        for name, socket in self.__end_points.items():
            if socket in socks:
                self._receive_message(name, socket)

    # ------------------------------------------------------------------------------------------------------------------
MessageController.py 文件源码 项目:py-enarksh 作者: SetBased 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def no_barking(self, seconds):
        """
        During start up of ZMQ the incoming file descriptors become 'ready for reading' while there is no message on
        the socket. This method prevent incoming sockets barking that the are ready the for reading.

        :param int seconds: The number of seconds the give the other ZMQ thread to start up.
        """
        sleep(seconds)

        for _ in range(1, len(self.end_points)):
            poller = zmq.Poller()
            for socket in self.end_points.values():
                if socket.type in [zmq.PULL, zmq.REP]:
                    poller.register(socket, zmq.POLLIN)

            poller.poll(1)

# ----------------------------------------------------------------------------------------------------------------------
thread.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run(self):
        """ Start the Authentication Agent thread task """
        self.authenticator.start()
        zap = self.authenticator.zap_socket
        poller = zmq.Poller()
        poller.register(self.pipe, zmq.POLLIN)
        poller.register(zap, zmq.POLLIN)
        while True:
            try:
                socks = dict(poller.poll())
            except zmq.ZMQError:
                break  # interrupted

            if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
                terminate = self._handle_pipe()
                if terminate:
                    break

            if zap in socks and socks[zap] == zmq.POLLIN:
                self._handle_zap()

        self.pipe.close()
        self.authenticator.stop()
test_future.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_poll(self):
        @gen.coroutine
        def test():
            a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL)
            f = b.poll(timeout=0)
            self.assertEqual(f.result(), 0)

            f = b.poll(timeout=1)
            assert not f.done()
            evt = yield f
            self.assertEqual(evt, 0)

            f = b.poll(timeout=1000)
            assert not f.done()
            yield a.send_multipart([b'hi', b'there'])
            evt = yield f
            self.assertEqual(evt, zmq.POLLIN)
            recvd = yield b.recv_multipart()
            self.assertEqual(recvd, [b'hi', b'there'])
        self.loop.run_sync(test)
test_poll.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_timeout(self):
        """make sure Poller.poll timeout has the right units (milliseconds)."""
        s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
        poller = self.Poller()
        poller.register(s1, zmq.POLLIN)
        tic = time.time()
        evt = poller.poll(.005)
        toc = time.time()
        self.assertTrue(toc-tic < 0.1)
        tic = time.time()
        evt = poller.poll(5)
        toc = time.time()
        self.assertTrue(toc-tic < 0.1)
        self.assertTrue(toc-tic > .001)
        tic = time.time()
        evt = poller.poll(500)
        toc = time.time()
        self.assertTrue(toc-tic < 1)
        self.assertTrue(toc-tic > 0.1)
future.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def poll(self, timeout=None, flags=_zmq.POLLIN):
        """poll the socket for events

        returns a Future for the poll results.
        """

        if self.closed:
            raise _zmq.ZMQError(_zmq.ENOTSUP)

        p = self._poller_class()
        p.register(self, flags)
        f = p.poll(timeout)

        future = self._Future()
        def unwrap_result(f):
            if future.done():
                return
            if f.exception():
                future.set_exception(f.exeception())
            else:
                evts = dict(f.result())
                future.set_result(evts.get(self, 0))

        f.add_done_callback(unwrap_result)
        return future
agent.py 文件源码 项目:osbrain 作者: opensistemas-hub 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def register(self, socket, address, alias=None, handler=None):
        assert not self.registered(address), \
            'Socket is already registered!'
        if not alias:
            alias = address
        self.socket[alias] = socket
        self.socket[address] = socket
        self.socket[socket] = socket
        self.address[alias] = address
        self.address[socket] = address
        self.address[address] = address
        if handler is not None:
            self.poller.register(socket, zmq.POLLIN)
            if address.kind in ('SUB', 'SYNC_SUB'):
                self.subscribe(socket, handler)
            else:
                self._set_handler(socket, handler)
controller_handler.py 文件源码 项目:lustre_task_driven_monitoring_framework 作者: GSI-HPC 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def connect(self):

        self.context = zmq.Context()

        if not self.context:
            raise RuntimeError('Failed to create ZMQ context!')

        self.socket = self.context.socket(zmq.REQ)

        if not self.socket:
            raise RuntimeError('Failed to create ZMQ socket!')

        self.socket.connect(self.endpoint)

        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN)

        self.is_connected = True
database_proxy_handler.py 文件源码 项目:lustre_task_driven_monitoring_framework 作者: GSI-HPC 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def connect(self):

        self.context = zmq.Context()

        if not self.context:
            raise RuntimeError('Failed to create ZMQ context!')

        self.socket = self.context.socket(zmq.PULL)

        if not self.socket:
            raise RuntimeError('Failed to create ZMQ socket!')

        self.socket.bind(self.endpoint)

        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN)

        self.is_connected = True
master_handler.py 文件源码 项目:lustre_task_driven_monitoring_framework 作者: GSI-HPC 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def connect(self):

        self.context = zmq.Context()

        if not self.context:
            raise RuntimeError('Failed to create ZMQ context!')

        self.socket = self.context.socket(zmq.REP)

        if not self.socket:
            raise RuntimeError('Failed to create ZMQ socket!')

        self.socket.bind(self.endpoint)

        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN)

        self.is_connected = True
plotting.py 文件源码 项目:Auspex 作者: BBN-Q 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run(self):
            self._loop = zmq.asyncio.ZMQEventLoop()
            asyncio.set_event_loop(self._loop)
            self.context = zmq.asyncio.Context()
            self.status_sock = self.context.socket(zmq.ROUTER)
            self.data_sock = self.context.socket(zmq.PUB)
            self.status_sock.bind("tcp://*:%s" % self.status_port)
            self.data_sock.bind("tcp://*:%s" % self.data_port)
            self.poller = zmq.asyncio.Poller()
            self.poller.register(self.status_sock, zmq.POLLIN)

            self._loop.create_task(self.poll_sockets())
            try:
                self._loop.run_forever()
            finally:
                self.status_sock.close()
                self.data_sock.close()
                self.context.destroy()
matplotlib-client.py 文件源码 项目:Auspex 作者: BBN-Q 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def loop(self):
        while self.running:
            evts = dict(self.poller.poll(50))
            if self.socket in evts and evts[self.socket] == zmq.POLLIN:
                msg = self.socket.recv_multipart()
                msg_type = msg[0].decode()
                name     = msg[1].decode()
                if msg_type == "done":
                    self.finished.emit(True)
                elif msg_type == "data":
                    result = [name]
                    # How many pairs of metadata and data are there?
                    num_arrays = int((len(msg) - 2)/2)
                    for i in range(num_arrays):
                        md, data = msg[2+2*i:4+2*i]
                        md = json.loads(md.decode())
                        A = np.frombuffer(data, dtype=md['dtype'])
                        result.append(A)
                    self.message.emit(tuple(result))
        self.socket.close()
mainloop.py 文件源码 项目:supvisors 作者: julien6387 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self):
        """ Contents of the infinite loop. """
        # Create zmq sockets
        sockets = SupvisorsZmq(self.supvisors)
        # create poller
        poller = zmq.Poller()
        # register sockets
        poller.register(sockets.internal_subscriber.socket, zmq.POLLIN)
        poller.register(sockets.puller.socket, zmq.POLLIN)
        # poll events forever
        while not self.stopping():
            socks = dict(poller.poll(500))
            # test stop condition again: if Supervisor is stopping,
            # any XML-RPC call would block this thread, and the other
            # because of the join
            if not self.stopping():
                self.check_requests(sockets, socks)
                self.check_events(sockets.internal_subscriber, socks)
        # close resources gracefully
        poller.unregister(sockets.puller.socket)
        poller.unregister(sockets.internal_subscriber.socket)
        sockets.close()
mainloop.py 文件源码 项目:supvisors 作者: julien6387 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def check_events(self, subscriber, socks):
        """ Forward external Supervisor events to main thread. """
        if subscriber.socket in socks and \
            socks[subscriber.socket] == zmq.POLLIN:
            try:
                message = subscriber.receive()
            except:
                print >> stderr, '[ERROR] failed to get data from subscriber'
            else:
                # The events received are not processed directly in this thread
                # because it would conflict with the processing in the
                # Supervisor thread, as they use the same data.
                # That's why a RemoteCommunicationEvent is used to push the
                # event in the Supervisor thread.
                self.send_remote_comm_event(
                    RemoteCommEvents.SUPVISORS_EVENT,
                    json.dumps(message))
test_messenger.py 文件源码 项目:networkzero 作者: tjguk 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def support_test_send_to_multiple_addresses(self, address1, address2):
        poller = zmq.Poller()

        socket1 = self.context.socket(roles['listener'])
        socket2 = self.context.socket(roles['listener'])
        try:
            socket1.bind("tcp://%s" % address1)
            socket2.bind("tcp://%s" % address2)
            poller.register(socket1, zmq.POLLIN)
            poller.register(socket2, zmq.POLLIN)
            polled = dict(poller.poll(2000))
            if socket1 in polled:
                socket1.recv()
                socket1.send(nw0.sockets._serialise(address1))
            elif socket2 in polled:
                socket2.recv()
                socket2.send(nw0.sockets._serialise(address2))
            else:
                raise RuntimeError("Nothing found")
        finally:
            socket1.close()
            socket2.close()
worker.py 文件源码 项目:bqueryd 作者: visualfabriq 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, data_dir=bqueryd.DEFAULT_DATA_DIR, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.DEBUG):
        if not os.path.exists(data_dir) or not os.path.isdir(data_dir):
            raise Exception("Datadir %s is not a valid directory" % data_dir)
        self.worker_id = binascii.hexlify(os.urandom(8))
        self.node_name = socket.gethostname()
        self.data_dir = data_dir
        self.data_files = set()
        context = zmq.Context()
        self.socket = context.socket(zmq.ROUTER)
        self.socket.setsockopt(zmq.LINGER, 500)
        self.socket.identity = self.worker_id
        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)
        self.redis_server = redis.from_url(redis_url)
        self.controllers = {}  # Keep a dict of timestamps when you last spoke to controllers
        self.check_controllers()
        self.last_wrm = 0
        self.start_time = time.time()
        self.logger = bqueryd.logger.getChild('worker ' + self.worker_id)
        self.logger.setLevel(loglevel)
        self.msg_count = 0
        signal.signal(signal.SIGTERM, self.term_signal())
controller.py 文件源码 项目:bqueryd 作者: visualfabriq 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def go(self):
        self.logger.info('[#############################>. Starting .<#############################]')

        while self.is_running:
            try:
                time.sleep(0.001)
                self.heartbeat()
                self.free_dead_workers()
                for sock, event in self.poller.poll(timeout=POLLING_TIMEOUT):
                    if event & zmq.POLLIN:
                        self.handle_in()
                    if event & zmq.POLLOUT:
                        self.handle_out()
                self.process_sink_results()
            except KeyboardInterrupt:
                self.logger.debug('Keyboard Interrupt')
                self.kill()
            except:
                self.logger.error("Exception %s" % traceback.format_exc())

        self.logger.info('Stopping')
        for x in (os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'),
                  os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address')):
            if os.path.exists(x):
                os.remove(x)
test_interoperability.py 文件源码 项目:azmq 作者: ereOn 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_tcp_sub_socket(event_loop, socket_factory, connect_or_bind):
    xpub_socket = socket_factory.create(zmq.XPUB)
    connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        # Wait one second for the subscription to arrive.
        assert xpub_socket.poll(1000) == zmq.POLLIN
        topic = xpub_socket.recv_multipart()
        assert topic == [b'\x01a']
        xpub_socket.send_multipart([b'a', b'message'])

        if connect_or_bind == 'connect':
            assert xpub_socket.poll(1000) == zmq.POLLIN
            topic = xpub_socket.recv_multipart()
            assert topic == [b'\x00a']

    with run_in_background(run):
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.SUB)
            await socket.subscribe(b'a')
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')

            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'a', b'message']
test_interoperability.py 文件源码 项目:azmq 作者: ereOn 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_tcp_xsub_socket(event_loop, socket_factory, connect_or_bind):
    xpub_socket = socket_factory.create(zmq.XPUB)
    connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        # Wait one second for the subscription to arrive.
        assert xpub_socket.poll(1000) == zmq.POLLIN
        topic = xpub_socket.recv_multipart()
        assert topic == [b'\x01a']
        xpub_socket.send_multipart([b'a', b'message'])

        if connect_or_bind == 'connect':
            assert xpub_socket.poll(1000) == zmq.POLLIN
            topic = xpub_socket.recv_multipart()
            assert topic == [b'\x00a']

    with run_in_background(run):
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.XSUB)
            await socket.send_multipart([b'\x01a'])
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')

            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'a', b'message']
test_interoperability.py 文件源码 项目:azmq 作者: ereOn 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_tcp_push_socket(event_loop, socket_factory, connect_or_bind):
    pull_socket = socket_factory.create(zmq.PULL)
    connect_or_bind(pull_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        assert pull_socket.poll(1000) == zmq.POLLIN
        message = pull_socket.recv_multipart()
        assert message == [b'hello', b'world']

    with run_in_background(run) as event:
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.PUSH)
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')
            await socket.send_multipart([b'hello', b'world'])

            while not event.is_set():
                await asyncio.sleep(0.1)
test_interoperability.py 文件源码 项目:azmq 作者: ereOn 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_tcp_pair_socket(event_loop, socket_factory, connect_or_bind):
    pair_socket = socket_factory.create(zmq.PAIR)
    connect_or_bind(pair_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        assert pair_socket.poll(1000) == zmq.POLLIN
        message = pair_socket.recv_multipart()
        assert message == [b'hello', b'world']
        pair_socket.send_multipart([b'my', b'message'])

    with run_in_background(run):
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.PAIR)
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')
            await socket.send_multipart([b'hello', b'world'])
            message = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert message == [b'my', b'message']
thread.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run(self):
        """ Start the Authentication Agent thread task """
        self.authenticator.start()
        zap = self.authenticator.zap_socket
        poller = zmq.Poller()
        poller.register(self.pipe, zmq.POLLIN)
        poller.register(zap, zmq.POLLIN)
        while True:
            try:
                socks = dict(poller.poll())
            except zmq.ZMQError:
                break  # interrupted

            if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
                terminate = self._handle_pipe()
                if terminate:
                    break

            if zap in socks and socks[zap] == zmq.POLLIN:
                self._handle_zap()

        self.pipe.close()
        self.authenticator.stop()
thread.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self):
        """ Start the Authentication Agent thread task """
        self.authenticator.start()
        zap = self.authenticator.zap_socket
        poller = zmq.Poller()
        poller.register(self.pipe, zmq.POLLIN)
        poller.register(zap, zmq.POLLIN)
        while True:
            try:
                socks = dict(poller.poll())
            except zmq.ZMQError:
                break  # interrupted

            if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
                terminate = self._handle_pipe()
                if terminate:
                    break

            if zap in socks and socks[zap] == zmq.POLLIN:
                self._handle_zap()

        self.pipe.close()
        self.authenticator.stop()
test_poll.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_timeout(self):
        """make sure Poller.poll timeout has the right units (milliseconds)."""
        s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
        poller = self.Poller()
        poller.register(s1, zmq.POLLIN)
        tic = time.time()
        evt = poller.poll(.005)
        toc = time.time()
        self.assertTrue(toc-tic < 0.1)
        tic = time.time()
        evt = poller.poll(5)
        toc = time.time()
        self.assertTrue(toc-tic < 0.1)
        self.assertTrue(toc-tic > .001)
        tic = time.time()
        evt = poller.poll(500)
        toc = time.time()
        self.assertTrue(toc-tic < 1)
        self.assertTrue(toc-tic > 0.1)
thread.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run(self):
        """ Start the Authentication Agent thread task """
        self.authenticator.start()
        zap = self.authenticator.zap_socket
        poller = zmq.Poller()
        poller.register(self.pipe, zmq.POLLIN)
        poller.register(zap, zmq.POLLIN)
        while True:
            try:
                socks = dict(poller.poll())
            except zmq.ZMQError:
                break  # interrupted

            if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
                terminate = self._handle_pipe()
                if terminate:
                    break

            if zap in socks and socks[zap] == zmq.POLLIN:
                self._handle_zap()

        self.pipe.close()
        self.authenticator.stop()
test_poll.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_timeout(self):
        """make sure Poller.poll timeout has the right units (milliseconds)."""
        s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
        poller = self.Poller()
        poller.register(s1, zmq.POLLIN)
        tic = time.time()
        evt = poller.poll(.005)
        toc = time.time()
        self.assertTrue(toc-tic < 0.1)
        tic = time.time()
        evt = poller.poll(5)
        toc = time.time()
        self.assertTrue(toc-tic < 0.1)
        self.assertTrue(toc-tic > .001)
        tic = time.time()
        evt = poller.poll(500)
        toc = time.time()
        self.assertTrue(toc-tic < 1)
        self.assertTrue(toc-tic > 0.1)
thread.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self):
        """ Start the Authentication Agent thread task """
        self.authenticator.start()
        zap = self.authenticator.zap_socket
        poller = zmq.Poller()
        poller.register(self.pipe, zmq.POLLIN)
        poller.register(zap, zmq.POLLIN)
        while True:
            try:
                socks = dict(poller.poll())
            except zmq.ZMQError:
                break  # interrupted

            if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
                terminate = self._handle_pipe()
                if terminate:
                    break

            if zap in socks and socks[zap] == zmq.POLLIN:
                self._handle_zap()

        self.pipe.close()
        self.authenticator.stop()
test_poll.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_timeout(self):
        """make sure Poller.poll timeout has the right units (milliseconds)."""
        s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
        poller = self.Poller()
        poller.register(s1, zmq.POLLIN)
        tic = time.time()
        evt = poller.poll(.005)
        toc = time.time()
        self.assertTrue(toc-tic < 0.1)
        tic = time.time()
        evt = poller.poll(5)
        toc = time.time()
        self.assertTrue(toc-tic < 0.1)
        self.assertTrue(toc-tic > .001)
        tic = time.time()
        evt = poller.poll(500)
        toc = time.time()
        self.assertTrue(toc-tic < 1)
        self.assertTrue(toc-tic > 0.1)
thread.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self):
        """ Start the Authentication Agent thread task """
        self.authenticator.start()
        zap = self.authenticator.zap_socket
        poller = zmq.Poller()
        poller.register(self.pipe, zmq.POLLIN)
        poller.register(zap, zmq.POLLIN)
        while True:
            try:
                socks = dict(poller.poll())
            except zmq.ZMQError:
                break  # interrupted

            if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
                terminate = self._handle_pipe()
                if terminate:
                    break

            if zap in socks and socks[zap] == zmq.POLLIN:
                self._handle_zap()

        self.pipe.close()
        self.authenticator.stop()
test_poll.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_timeout(self):
        """make sure Poller.poll timeout has the right units (milliseconds)."""
        s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
        poller = self.Poller()
        poller.register(s1, zmq.POLLIN)
        tic = time.time()
        evt = poller.poll(.005)
        toc = time.time()
        self.assertTrue(toc-tic < 0.1)
        tic = time.time()
        evt = poller.poll(5)
        toc = time.time()
        self.assertTrue(toc-tic < 0.1)
        self.assertTrue(toc-tic > .001)
        tic = time.time()
        evt = poller.poll(500)
        toc = time.time()
        self.assertTrue(toc-tic < 1)
        self.assertTrue(toc-tic > 0.1)


问题


面经


文章

微信
公众号

扫码关注公众号