python类NOBLOCK的实例源码

__init__.py 文件源码 项目:frontera-docs-zh_CN 作者: xsren 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_messages(self, timeout=0.1, count=1):
        started = time()
        sleep_time = timeout / 10.0
        while count:
            try:
                msg = self.subscriber.recv_multipart(copy=True, flags=zmq.NOBLOCK)
            except zmq.Again:
                if time() - started > timeout:
                    break
                sleep(sleep_time)
            else:
                partition_seqno, global_seqno = unpack(">II", msg[2])
                seqno = global_seqno if self.count_global else partition_seqno
                if not self.counter:
                    self.counter = seqno
                elif self.counter != seqno:
                    if self.seq_warnings:
                        self.logger.warning("Sequence counter mismatch: expected %d, got %d. Check if system "
                                            "isn't missing messages." % (self.counter, seqno))
                    self.counter = None
                yield msg[1]
                count -= 1
                if self.counter:
                    self.counter += 1
                self.stats[self.stat_key] += 1
endpoint.py 文件源码 项目:spyking-circus-ort 作者: spyking-circus 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _get_data(self, blocking=True):
        """Get batch of data."""
        # TODO complete docstring.

        if not blocking:
            try:
                batch = self.socket.recv(flags=zmq.NOBLOCK)
            except zmq.Again:
                return None
        else:
            batch = self.socket.recv()

        if batch == TERM_MSG:
            raise EOCError()

        if self.structure == 'array':
            batch = numpy.fromstring(batch, dtype=self.dtype)
            batch = numpy.reshape(batch, self.shape)
        elif self.structure == 'dict':
            batch = json.loads(batch)
        elif self.structure == 'boolean':
            batch = bool(batch)

        return batch
zstack.py 文件源码 项目:indy-plenum 作者: hyperledger 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _receiveFromListener(self, quota) -> int:
        """
        Receives messages from listener
        :param quota: number of messages to receive
        :return: number of received messages
        """
        assert quota
        i = 0
        while i < quota:
            try:
                ident, msg = self.listener.recv_multipart(flags=zmq.NOBLOCK)
                if not msg:
                    # Router probing sends empty message on connection
                    continue
                i += 1
                if self.onlyListener and ident not in self.remotesByKeys:
                    self.peersWithoutRemotes.add(ident)
                self._verifyAndAppend(msg, ident)
            except zmq.Again:
                break
        if i > 0:
            logger.trace('{} got {} messages through listener'.
                         format(self, i))
        return i
zmqbus.py 文件源码 项目:kervi 作者: kervi 项目源码 文件源码 阅读 87 收藏 0 点赞 0 评论 0
def run(self):
        while not self._terminate:
            connection_message = None
            try:
                connection_message = self._socket.recv_multipart(zmq.NOBLOCK)
                [tag, json_message] = connection_message
                message = json.loads(json_message.decode('utf8'))
                if tag == b"queryResponse":
                    self._bus.resolve_response(message)
                else:
                    handler_thread = ZMQHandlerThread(self._bus, tag.decode('utf-8'), message)
                    handler_thread.start()
                #time.sleep(0.001)
            except zmq.ZMQError as e:
                if e.errno == zmq.EAGAIN:
                    time.sleep(.001)
                    pass
                elif e.errno == zmq.ETERM:
                    #print("terminate", self._address)
                    self._terminate = True
                else:
                    print("message zmq exception:", self._address, e, e.errno)
            except Exception as e:
                print("message exception:", self._address, e, connection_message)
        #print("message thread terminated:", self._address)
zmq_send.py 文件源码 项目:odr-stream-router 作者: digris 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def send(output):

    zmq_ctx = zmq.Context()

    c = zmq_ctx.socket(zmq.PUB)
    c.connect(output)

    while True:

        frame = (output).encode()
        c.send(frame, zmq.NOBLOCK)
        time.sleep(0.1)
router.py 文件源码 项目:odr-stream-router 作者: digris 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def send(self, frame):
        """
        passing the zmq frame to the output's connection
        """
        self.connection.send(frame, zmq.NOBLOCK)
banyan_base.py 文件源码 项目:python_banyan 作者: MrYsLab 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def receive_loop(self):
        """
        This is the receive loop for Banyan messages.

        This method may be overwritten to meet the needs
        of the application before handling received messages.

        """
        while True:
            try:
                data = self.subscriber.recv_multipart(zmq.NOBLOCK)
                if self.numpy:
                    payload = msgpack.unpackb(data[1], object_hook=m.decode)
                    self.incoming_message_processing(data[0].decode(), payload)
                else:
                    self.incoming_message_processing(data[0].decode(), umsgpack.unpackb(data[1]))
            # if no messages are available, zmq throws this exception
            except zmq.error.Again:
                try:
                    time.sleep(self.loop_time)
                except KeyboardInterrupt:
                    self.clean_up()
                    raise KeyboardInterrupt
core.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def send(self, data, flags=0, copy=True, track=False):
        """send, which will only block current greenlet

        state_changed always fires exactly once (success or fail) at the
        end of this method.
        """

        # if we're given the NOBLOCK flag act as normal and let the EAGAIN get raised
        if flags & zmq.NOBLOCK:
            try:
                msg = super(_Socket, self).send(data, flags, copy, track)
            finally:
                if not self.__in_send_multipart:
                    self.__state_changed()
            return msg
        # ensure the zmq.NOBLOCK flag is part of flags
        flags |= zmq.NOBLOCK
        while True: # Attempt to complete this operation indefinitely, blocking the current greenlet
            try:
                # attempt the actual call
                msg = super(_Socket, self).send(data, flags, copy, track)
            except zmq.ZMQError as e:
                # if the raised ZMQError is not EAGAIN, reraise
                if e.errno != zmq.EAGAIN:
                    if not self.__in_send_multipart:
                        self.__state_changed()
                    raise
            else:
                if not self.__in_send_multipart:
                    self.__state_changed()
                return msg
            # defer to the event loop until we're notified the socket is writable
            self._wait_write()
core.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def recv(self, flags=0, copy=True, track=False):
        """recv, which will only block current greenlet

        state_changed always fires exactly once (success or fail) at the
        end of this method.
        """
        if flags & zmq.NOBLOCK:
            try:
                msg = super(_Socket, self).recv(flags, copy, track)
            finally:
                if not self.__in_recv_multipart:
                    self.__state_changed()
            return msg

        flags |= zmq.NOBLOCK
        while True:
            try:
                msg = super(_Socket, self).recv(flags, copy, track)
            except zmq.ZMQError as e:
                if e.errno != zmq.EAGAIN:
                    if not self.__in_recv_multipart:
                        self.__state_changed()
                    raise
            else:
                if not self.__in_recv_multipart:
                    self.__state_changed()
                return msg
            self._wait_read()
test_pubsub.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_topic(self):
        s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
        s2.setsockopt(zmq.SUBSCRIBE, b'x')
        time.sleep(0.1)
        msg1 = b'message'
        s1.send(msg1)
        self.assertRaisesErrno(zmq.EAGAIN, s2.recv, zmq.NOBLOCK)
        msg1 = b'xmessage'
        s1.send(msg1)
        msg2 = s2.recv()
        self.assertEqual(msg1, msg2)
test_error.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_again(self):
        s = self.context.socket(zmq.REP)
        self.assertRaises(Again, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
        s.close()
YubiGuard.py 文件源码 项目:YubiGuard 作者: pykong 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def start_listener(self):
        print('ZMQ listener started')
        while True:
            try:
                self.s.recv(zmq.NOBLOCK)  # note NOBLOCK here
            except zmq.Again:
                # no message to recv, do other things
                time.sleep(0.05)
            else:
                self.on_q.put(ON_SIGNAL)
mflow.py 文件源码 项目:mflow 作者: datastreaming 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def header(self):
        flags = 0 if self.block else zmq.NOBLOCK
        self.raw_header = self.socket.recv(flags=flags)
        return json.loads(self.raw_header.decode("utf-8"))
mflow.py 文件源码 项目:mflow 作者: datastreaming 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def next(self, as_json=False):
        try:
            if self.raw_header:
                raw = self.raw_header
                self.raw_header = None
            else:
                flags = 0 if self.block else zmq.NOBLOCK
                raw = self.socket.recv(flags=flags, copy=self.zmq_copy, track=self.zmq_track)

            self.statistics.bytes_received += len(raw)
            if as_json:
                return json.loads(raw.decode("utf-8"))
            return raw
        except zmq.ZMQError:
            return None
mflow.py 文件源码 项目:mflow 作者: datastreaming 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def flush(self, success=True):
        flags = 0 if self.block else zmq.NOBLOCK
        # Clear remaining sub-messages
        while self.has_more():
            try:
                self.socket.recv(flags=flags, copy=self.zmq_copy, track=self.zmq_track)
                logger.info('Skipping sub-message')
            except zmq.ZMQError:
                pass

        if success:
            # Update statistics
            self.statistics.total_bytes_received += self.statistics.bytes_received
            self.statistics.bytes_received = 0
            self.statistics.messages_received += 1
test_zmq_pub_sub.py 文件源码 项目:integration-prototype 作者: SKA-ScienceDataProcessor 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def recv_messages(zmq_subscriber, timeout_count, message_count):
    """Test utility function.

    Subscriber thread that receives and counts ZMQ messages.

    Args:
        zmq_subscriber (zmq.Socket): ZMQ subscriber socket.
        timeout_count (int): No. of failed receives until exit.
        message_count (int): No. of messages expected to be received.

    Returns:
        (int) Number of messages received.
    """
    # pylint: disable=E1101
    fails = 0  # No. of receives that didn't return a message.
    receive_count = 0  # Total number of messages received.
    while fails < timeout_count:
        try:
            _ = zmq_subscriber.recv_string(flags=zmq.NOBLOCK)
            fails = 0
            receive_count += 1
            if receive_count == message_count:
                break
        except zmq.ZMQError as error:
            if error.errno == zmq.EAGAIN:
                pass
            else:
                raise
        fails += 1
        time.sleep(1e-6)
    return receive_count
logging_aggregator.py 文件源码 项目:integration-prototype 作者: SKA-ScienceDataProcessor 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run(self):
        """Run loop.

        Receives log messages from connected publishers and logs them via
        a python logging interface.
        """
        log = logging.getLogger('sip.logging_aggregator')
        fail_count = 0
        fail_count_limit = 100
        # Exponential relaxation of timeout in event loop.
        timeout = np.logspace(-6, -2, fail_count_limit)
        while not self._stop_requested.is_set():
            try:
                topic, values = self._subscriber.recv_multipart(zmq.NOBLOCK)
                str_values = values.decode('utf-8')
                try:
                    dict_values = json.loads(str_values)
                    record = logging.makeLogRecord(dict_values)
                    log.handle(record)
                    fail_count = 0
                except json.decoder.JSONDecodeError:
                    print('ERROR: Unable to convert JSON log record.')
                    raise
            except zmq.ZMQError as e:
                if e.errno == zmq.EAGAIN:
                    fail_count += 1
                else:
                    raise  # Re-raise the exception
            if fail_count < fail_count_limit:
                _timeout = timeout[fail_count]
            else:
                _timeout = timeout[-1]
            self._stop_requested.wait(_timeout)
supvisorszmq.py 文件源码 项目:supvisors 作者: julien6387 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def receive(self):
        """ Reception and pyobj de-serialization of one message. """
        return self.socket.recv_pyobj(zmq.NOBLOCK)
supvisorszmq.py 文件源码 项目:supvisors 作者: julien6387 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def send_check_address(self, address_name):
        """ Send request to check address. """
        self.logger.trace('send CHECK_ADDRESS {}'.format(address_name))
        try:
            self.socket.send_pyobj((DeferredRequestHeaders.CHECK_ADDRESS,
                                    (address_name, )),
                                   zmq.NOBLOCK)
        except zmq.error.Again:
            self.logger.error('CHECK_ADDRESS not sent')
supvisorszmq.py 文件源码 项目:supvisors 作者: julien6387 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def send_isolate_addresses(self, address_names):
        """ Send request to isolate address. """
        self.logger.trace('send ISOLATE_ADDRESSES {}'.format(address_names))
        try:
            self.socket.send_pyobj((DeferredRequestHeaders.ISOLATE_ADDRESSES,
                                    address_names),
                                   zmq.NOBLOCK)
        except zmq.error.Again:
            self.logger.error('ISOLATE_ADDRESSES not sent')


问题


面经


文章

微信
公众号

扫码关注公众号