python类Again()的实例源码

tools.py 文件源码 项目:mflow 作者: datastreaming 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def start(self, socket):
        """
        Start the monitoring thread and socket.
        :param socket: Socket to monitor.
        """
        # Start a thread only if it is not already running.
        if self.monitor_listening.is_set():
            return

        # Setup monitor socket.
        monitor_socket = socket.get_monitor_socket(events=self.events)
        monitor_socket.setsockopt(zmq.RCVTIMEO, self.receive_timeout)
        self.monitor_listening.set()

        def event_listener(monitor_listening):
            while monitor_listening.is_set():
                try:
                    event = recv_monitor_message(monitor_socket)
                    # The socket is closed, just stop listening now.
                    if event["event"] == zmq.EVENT_CLOSED:
                        monitor_listening.clear()

                    self._notify_listeners(event)
                # In case the receive cannot be completed before the timeout.
                except zmq.Again:
                    # Heartbeat for listeners - we do not need an additional thread for time based listeners.
                    self._notify_listeners(None)

            # Cleanup monitor socket.
            socket.disable_monitor()
            monitor_socket.close()

        self.monitor_thread = threading.Thread(target=event_listener, args=(self.monitor_listening,))
        # In case someone does not call disconnect, this will stop the thread anyway.
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
mflow.py 文件源码 项目:mflow 作者: datastreaming 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def send(self, message, send_more=False, block=True, as_json=False):

        flags = 0
        if send_more:
            flags = zmq.SNDMORE
        if not block:
            flags = flags | zmq.NOBLOCK

        try:
            if as_json:
                self.socket.send_json(message, flags)
            else:
                self.socket.send(message, flags, copy=self.zmq_copy, track=self.zmq_track)
        except zmq.Again as e:
            if not block:
                pass
            else:
                raise e
        except zmq.ZMQError as e:
            logger.error(sys.exc_info()[1])
            raise e
ListCommunication.py 文件源码 项目:CellsCycle 作者: AQuadroTeam 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def forward(self, data):

        try:
            # self.logger.debug('sending message')
            self.list_communication_channel.send(data)
            # self.logger.debug('ok with the message')
        except zmq.NotDone:
            # time.sleep(TRY_TIMEOUT)
            self.logger.debug('my recipient is dead, not done')
            self.list_communication_channel.close()
        except zmq.Again:
            self.logger.debug('my recipient is dead')
            # self.list_communication_channel.close()
            raise zmq.Again
        except zmq.ZMQError as a:
            self.logger.debug("Error in message forward " + a.strerror)
            self.context.destroy()
            self.context = zmq.Context()
test_supvisorszmq.py 文件源码 项目:supvisors 作者: julien6387 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_disconnection(self):
        """ Test the disconnection of subscribers. """
        from supvisors.utils import InternalEventHeaders
        # get the local address
        local_address = self.supvisors.address_mapper.local_address
        # test remote disconnection
        address = next(address
                       for address in self.supvisors.address_mapper.addresses
                       if address != local_address)
        self.subscriber.disconnect([address])
        # send a tick event from the local publisher
        payload = {'date': 1000}
        self.publisher.send_tick_event(payload)
        # check the reception of the tick event
        msg = self.receive('Tick')
        self.assertTupleEqual((InternalEventHeaders.TICK,
                               local_address, payload), msg)
        # test local disconnection
        self.subscriber.disconnect([local_address])
        # send a tick event from the local publisher
        self.publisher.send_tick_event(payload)
        # check the non-reception of the tick event
        with self.assertRaises(zmq.Again):
            self.subscriber.receive()
miner.py 文件源码 项目:og-miner 作者: opendns 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def generator_from_zmq_pull(context, host):
    socket = context.socket(zmq.PULL)
    # TODO: Configure socket with clean properties to avoid message overload.
    if host.endswith('/'):
        host = host[:-1]
    print_item("+", "Binding ZMQ pull socket : " + colorama.Fore.CYAN + "{0}".format(host) + colorama.Style.RESET_ALL)
    socket.bind(host)

    while True:
        try:
            message = socket.recv(flags=zmq.NOBLOCK)
        except zmq.Again as e:
            message = None
        if message is None:
            yield None # NOTE: We have to make the generator non blocking.
        else:
            task = json.loads(message)
            yield task
__init__.py 文件源码 项目:frontera-docs-zh_CN 作者: xsren 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_messages(self, timeout=0.1, count=1):
        started = time()
        sleep_time = timeout / 10.0
        while count:
            try:
                msg = self.subscriber.recv_multipart(copy=True, flags=zmq.NOBLOCK)
            except zmq.Again:
                if time() - started > timeout:
                    break
                sleep(sleep_time)
            else:
                partition_seqno, global_seqno = unpack(">II", msg[2])
                seqno = global_seqno if self.count_global else partition_seqno
                if not self.counter:
                    self.counter = seqno
                elif self.counter != seqno:
                    if self.seq_warnings:
                        self.logger.warning("Sequence counter mismatch: expected %d, got %d. Check if system "
                                            "isn't missing messages." % (self.counter, seqno))
                    self.counter = None
                yield msg[1]
                count -= 1
                if self.counter:
                    self.counter += 1
                self.stats[self.stat_key] += 1
endpoint.py 文件源码 项目:spyking-circus-ort 作者: spyking-circus 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _get_data(self, blocking=True):
        """Get batch of data."""
        # TODO complete docstring.

        if not blocking:
            try:
                batch = self.socket.recv(flags=zmq.NOBLOCK)
            except zmq.Again:
                return None
        else:
            batch = self.socket.recv()

        if batch == TERM_MSG:
            raise EOCError()

        if self.structure == 'array':
            batch = numpy.fromstring(batch, dtype=self.dtype)
            batch = numpy.reshape(batch, self.shape)
        elif self.structure == 'dict':
            batch = json.loads(batch)
        elif self.structure == 'boolean':
            batch = bool(batch)

        return batch
zstack.py 文件源码 项目:indy-plenum 作者: hyperledger 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _receiveFromListener(self, quota) -> int:
        """
        Receives messages from listener
        :param quota: number of messages to receive
        :return: number of received messages
        """
        assert quota
        i = 0
        while i < quota:
            try:
                ident, msg = self.listener.recv_multipart(flags=zmq.NOBLOCK)
                if not msg:
                    # Router probing sends empty message on connection
                    continue
                i += 1
                if self.onlyListener and ident not in self.remotesByKeys:
                    self.peersWithoutRemotes.add(ident)
                self._verifyAndAppend(msg, ident)
            except zmq.Again:
                break
        if i > 0:
            logger.trace('{} got {} messages through listener'.
                         format(self, i))
        return i
test_retry_eintr.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_retry_recv(self):
        pull = self.socket(zmq.PULL)
        pull.rcvtimeo = self.timeout_ms
        self.alarm()
        self.assertRaises(zmq.Again, pull.recv)
        assert self.timer_fired
test_retry_eintr.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_retry_send(self):
        push = self.socket(zmq.PUSH)
        push.sndtimeo = self.timeout_ms
        self.alarm()
        self.assertRaises(zmq.Again, push.send, b('buf'))
        assert self.timer_fired
test_error.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_again(self):
        s = self.context.socket(zmq.REP)
        self.assertRaises(Again, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
        s.close()
YubiGuard.py 文件源码 项目:YubiGuard 作者: pykong 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def start_listener(self):
        print('ZMQ listener started')
        while True:
            try:
                self.s.recv(zmq.NOBLOCK)  # note NOBLOCK here
            except zmq.Again:
                # no message to recv, do other things
                time.sleep(0.05)
            else:
                self.on_q.put(ON_SIGNAL)
ChannelTest.py 文件源码 项目:CellsCycle 作者: AQuadroTeam 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def client_behavior(settings, logger):

    internal_channel = InternalChannel(addr="127.0.0.1", port=settings.getIntPort(), logger=logger)

    try:
        internal_channel.generate_internal_channel_client_side()
    except ZMQError as e:
        logger.debug(e)

    message = Message()
    message.priority = ALIVE
    message.source_flag = INT
    message.source_id = '1'
    message.target_id = '1'
    message.target_addr = '192.168.1.1'
    message.target_key = '{}:{}'.format(0, 19)

    internal_channel.send_first_internal_channel_message(dumps(message))

    msg = internal_channel.wait_int_message(dont_wait=False)

    logger.debug("msg : " + msg)

    external_channel = ExternalChannel(addr="127.0.0.1", port=settings.getExtPort(), logger=logger)
    external_channel.generate_external_channel_client_side()
    external_channel.external_channel_subscribe()

    logger.debug(loads(external_channel.wait_ext_message()).printable_message())

    logger.debug("try_to_connect TEST COMPLETED")

    stop = False
    while not stop:
        try:
            logger.debug(loads(external_channel.wait_ext_message()).printable_message())
            sleep(1)
        except Again:
            logger.debug("my master is DEAD")
            stop = True
ChannelTest.py 文件源码 项目:CellsCycle 作者: AQuadroTeam 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def server_behavior(settings, logger):

    internal_channel = InternalChannel(addr="127.0.0.1", port=settings.getIntPort(), logger=logger)

    try:
        internal_channel.generate_internal_channel_server_side()
        msg = loads(internal_channel.wait_int_message(dont_wait=False))
        logger.debug("msg : ")
        logger.debug(msg.printable_message())
        internal_channel.reply_to_int_message(OK)
    except ZMQError as e:
        logger.debug(e)

    external_channel = ExternalChannel(addr="127.0.0.1", port=settings.getExtPort(), logger=logger)

    external_channel.generate_external_channel_server_side()
    external_channel.external_channel_publish()

    message = Message()
    message.priority = ALIVE
    message.source_flag = EXT
    message.source_id = '1'
    message.target_id = '1'
    message.target_addr = '192.168.1.1'
    message.target_key = '{}:{}'.format(0, 19)

    sleep(1)

    external_channel.forward(dumps(message))

    logger.debug("try_to_connect TEST COMPLETED")

    stop = False
    while not stop:
        try:
            external_channel.forward(dumps(message))
            sleep(1)
        except zmq.Again:
            stop = True
ListCommunication.py 文件源码 项目:CellsCycle 作者: AQuadroTeam 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def wait_int_message(self, dont_wait=True):
        if dont_wait:
            # wait for internal message
            try:
                msg = self.list_communication_channel.recv(zmq.DONTWAIT)
                return msg
            except zmq.Again:
                raise zmq.Again
        else:
            self.logger.debug('waiting for a request')
            msg = self.list_communication_channel.recv()
            return msg
test_supvisorszmq.py 文件源码 项目:supvisors 作者: julien6387 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def receive(self, event_type):
        """ This method performs a checked reception on the subscriber. """
        try:
            self.subscriber.socket.poll(1000)
            return self.subscriber.receive()
        except zmq.Again:
            self.fail('Failed to get {} event'.format(event_type))
test_supvisorszmq.py 文件源码 项目:supvisors 作者: julien6387 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def receive(self, event_type):
        """ This method performs a checked reception on the puller. """
        try:
            return self.puller.receive()
        except zmq.Again:
            self.fail('Failed to get {} request'. format(event_type))
test_supvisorszmq.py 文件源码 项目:supvisors 作者: julien6387 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def check_reception(self, header=None, data=None):
        """ The method tests that the message is received correctly
        or not received at all. """
        if header and data:
            # check that subscriber receives the message
            try:
                msg = self.subscriber.receive()
            except zmq.Again:
                self.fail('Failed to get {} status'.format(header))
            self.assertTupleEqual((header, data), msg)
        else:
            # check the non-reception of the Supvisors status
            with self.assertRaises(zmq.Again):
                self.subscriber.receive()
core.py 文件源码 项目:CommunityCellularManager 作者: facebookincubator 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _send_and_receive(self, message):
    """Sending payloads to NM and returning Response instances.

    Or, if the action failed, an error will be raised during the instantiation
    of the Response.  Can also timeout if the socket receives no data for some
    period.

    Args:
      message: dict of a message to send to NM

    Returns:
      Response instance if the request succeeded

    Raises:
      TimeoutError: if nothing is received for the timeout
    """
    # zmq is thread unsafe: if we send a second request before
    # we get back the first response, we throw an exception
    # fix that -kheimerl
    with self.lock:
      # Send the message and poll for responses.
      self.socket.send(json.dumps(message))
      responses = self.socket.poll(timeout=self.socket_timeout * 1000)
      if responses:
        try:
          raw_response_data = self.socket.recv()
          return Response(raw_response_data)
        except zmq.Again:
          pass
      # If polling fails or recv failes, we reset the socket or
      # it will be left in a bad state, waiting for a response.
      self.socket.close()
      self.setup_socket()
      self.socket.connect(self.address)
      raise TimeoutError('did not receive a response')
worker.py 文件源码 项目:bqueryd 作者: visualfabriq 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def handle_in(self):
        try:
            tmp = self.socket.recv_multipart()
        except zmq.Again:
            return
        if len(tmp) != 2:
            self.logger.critical('Received a msg with len != 2, something seriously wrong. ')
            return

        sender, msg_buf = tmp
        msg = msg_factory(msg_buf)

        data = self.controllers.get(sender)
        if not data:
            self.logger.critical('Received a msg from %s - this is an unknown sender' % sender)
            return
        data['last_seen'] = time.time()
        # self.logger.debug('Received from %s' % sender)

        # TODO Notify Controllers that we are busy, no more messages to be sent
        # The above busy notification is not perfect as other messages might be on their way already
        # but for long-running queries it will at least ensure other controllers
        # don't try and overuse this node by filling up a queue
        busy_msg = BusyMessage()
        self.send_to_all(busy_msg)

        try:
            tmp = self.handle(msg)
        except Exception, e:
            tmp = ErrorMessage(msg)
            tmp['payload'] = traceback.format_exc()
            self.logger.exception(tmp['payload'])
        if tmp:
            self.send(sender, tmp)

        self.send_to_all(DoneMessage())  # Send a DoneMessage to all controllers, this flags you as 'Done'. Duh
server.py 文件源码 项目:og-miner 作者: opendns 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self):
        while True:
            try:
                message = self.pull.recv(flags=zmq.NOBLOCK)
            except zmq.Again as e:
                message = None
            if message is not None:
                task = json.loads(message)
                self.redis.setex(
                    task['transaction'],
                    self.result_expiration,
                    json.dumps(task['data'])
                )
test_error.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_again(self):
        s = self.context.socket(zmq.REP)
        self.assertRaises(Again, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
        s.close()
test_error.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_again(self):
        s = self.context.socket(zmq.REP)
        self.assertRaises(Again, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
        s.close()
test_error.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_again(self):
        s = self.context.socket(zmq.REP)
        self.assertRaises(Again, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
        s.close()
test_error.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_again(self):
        s = self.context.socket(zmq.REP)
        self.assertRaises(Again, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
        s.close()
test_error.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_again(self):
        s = self.context.socket(zmq.REP)
        self.assertRaises(Again, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
        s.close()
trex_stl_jsonrpc_client.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send_raw_msg (self, msg):

        tries = 0
        while True:
            try:
                self.socket.send(msg)
                break
            except zmq.Again:
                tries += 1
                if tries > 5:
                    self.disconnect()
                    return RC_ERR("*** [RPC] - Failed to send message to server")


        tries = 0
        while True:
            try:
                response = self.socket.recv()
                break
            except zmq.Again:
                tries += 1
                if tries > 5:
                    self.disconnect()
                    return RC_ERR("*** [RPC] - Failed to get server response at {0}".format(self.transport))


        return response



    # processs a single response from server
zeromq.py 文件源码 项目:napalm-logs 作者: napalm-automation 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def receive(self):
        '''
        Return the message received.

        ..note::
            In ZMQ we are unable to get the address where we got the message from.
        '''
        try:
            msg = self.sub.recv()
        except zmq.Again as error:
            log.error('Unable to receive messages: %s', error, exc_info=True)
            raise ListenerException(error)
        log.debug('[%s] Received %s', time.time(), msg)
        return msg, ''
test_control_server.py 文件源码 项目:lewis 作者: DMSC-Instrument-Data 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_process_does_not_block(self):
        mock_socket = Mock()
        mock_socket.recv_unicode.side_effect = zmq.Again()

        server = ControlServer(None, connection_string='127.0.0.1:10000')
        server._socket = mock_socket
        assertRaisesNothing(self, server.process)

        mock_socket.recv_unicode.assert_has_calls([call(flags=zmq.NOBLOCK)])
control_server.py 文件源码 项目:lewis 作者: DMSC-Instrument-Data 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def process(self, blocking=False):
        """
        Each time this method is called, the socket tries to retrieve data and passes
        it to the JSONRPCResponseManager, which in turn passes the RPC to the
        ExposedObjectCollection.

        In case no data are available, the method does nothing. This behavior is required for
        Lewis where everything is running in one thread. The central loop can call process
        at some point to process remote calls, so the RPC-server does not introduce its own
        infinite processing loop.

        If the server has not been started yet (via :meth:`start_server`), a RuntimeError
        is raised.

        :param blocking: If True, this function will block until it has received data or a timeout
                         is triggered. Default is False to preserve behavior of prior versions.
        """
        if self._socket is None:
            raise RuntimeError('The server has not been started yet, use start_server to do so.')

        try:
            request = self._socket.recv_unicode(flags=zmq.NOBLOCK if not blocking else 0)

            self.log.debug('Got request %s', request)

            try:
                response = JSONRPCResponseManager.handle(request, self._exposed_object)
                self._socket.send_unicode(response.json)

                self.log.debug('Sent response %s', response.json)
            except TypeError as e:
                self._socket.send_json(
                    self._unhandled_exception_response(json.loads(request)['id'], e))
        except zmq.Again:
            pass


问题


面经


文章

微信
公众号

扫码关注公众号