python类FIONREAD的实例源码

serialposix.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
serialposix.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
serialposix.py 文件源码 项目:microperi 作者: c0d3st0rm 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
serialposix.py 文件源码 项目:microperi 作者: c0d3st0rm 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
inotify_simple.py 文件源码 项目:inotify_simple 作者: chrisjbillington 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def read(self, timeout=None, read_delay=None):
        """Read the inotify file descriptor and return the resulting list of
        :attr:`~inotify_simple.Event` namedtuples (wd, mask, cookie, name).

        Args:
            timeout (int): The time in milliseconds to wait for events if
                there are none. If `negative or `None``, block until there are
                events.

            read_delay (int): The time in milliseconds to wait after the first
                event arrives before reading the buffer. This allows further
                events to accumulate before reading, which allows the kernel
                to consolidate like events and can enhance performance when
                there are many similar events.

        Returns:
            list: list of :attr:`~inotify_simple.Event` namedtuples"""
        # Wait for the first event:
        pending = self._poller.poll(timeout)
        if not pending:
            # Timed out, no events
            return []
        if read_delay is not None:
            # Wait for more events to accumulate:
            time.sleep(read_delay/1000.0)
        # How much data is available to read?
        bytes_avail = ctypes.c_int()
        ioctl(self.fd, FIONREAD, bytes_avail)
        buffer_size = bytes_avail.value
        # Read and parse it:
        data = os.read(self.fd, buffer_size)
        events = parse_events(data)
        return events
serialposix.py 文件源码 项目:gcodeplot 作者: arpruss 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
serialposix.py 文件源码 项目:gcodeplot 作者: arpruss 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
serialposix.py 文件源码 项目:bitio 作者: whaleygeek 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
serialposix.py 文件源码 项目:bitio 作者: whaleygeek 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
client.py 文件源码 项目:caproto 作者: NSLS-II 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __call__(self):
        '''Selector poll loop'''
        avail_buf = array.array('i', [0])
        while self._running:
            with self._socket_map_lock:
                for sock in self._unregister_sockets:
                    self.selector.unregister(sock)
                self._unregister_sockets.clear()

                for sock in self._register_sockets:
                    self.selector.register(sock, selectors.EVENT_READ)
                self._register_sockets.clear()

            events = self.selector.select(timeout=0.1)
            with self._socket_map_lock:
                if self._unregister_sockets or self._register_sockets:
                    continue

                ready_ids = [self.socket_to_id[key.fileobj]
                             for key, mask in events]
                ready_objs = [(self.objects[obj_id], self.id_to_socket[obj_id])
                              for obj_id in ready_ids]

            for obj, sock in ready_objs:
                # TODO: consider thread pool for recv and command_loop

                if fcntl.ioctl(sock, termios.FIONREAD, avail_buf) < 0:
                    continue

                bytes_available = avail_buf[0]

                try:
                    bytes_recv, address = sock.recvfrom(max((4096,
                                                             bytes_available)))
                except OSError as ex:
                    if ex.errno == errno.EAGAIN:
                        continue
                    bytes_recv, address = b'', None

                obj.received(bytes_recv, address)
serialposix.py 文件源码 项目:microbit-serial 作者: martinohanlon 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
serialposix.py 文件源码 项目:microbit-serial 作者: martinohanlon 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
serialposix.py 文件源码 项目:ddt4all 作者: cedricp 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
serialposix.py 文件源码 项目:ddt4all 作者: cedricp 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
serialposix.py 文件源码 项目:mt7687-serial-uploader 作者: will127534 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
serialposix.py 文件源码 项目:mt7687-serial-uploader 作者: will127534 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
inotify_simple.py 文件源码 项目:i3configger 作者: obestwalter 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def read(self, timeout=None, read_delay=None):
        """Read the inotify file descriptor and return the resulting list of
        :attr:`~inotify_simple.Event` namedtuples (wd, mask, cookie, name).

        Args:
            timeout (int): The time in milliseconds to wait for events if
                there are none. If `negative or `None``, block until there are
                events.

            read_delay (int): The time in milliseconds to wait after the first
                event arrives before reading the buffer. This allows further
                events to accumulate before reading, which allows the kernel
                to consolidate like events and can enhance performance when
                there are many similar events.

        Returns:
            list: list of :attr:`~inotify_simple.Event` namedtuples"""
        # Wait for the first event:
        pending = self._poller.poll(timeout)
        if not pending:
            # Timed out, no events
            return []
        if read_delay is not None:
            # Wait for more events to accumulate:
            time.sleep(read_delay/1000.0)
        # How much data is available to read?
        bytes_avail = ctypes.c_int()
        ioctl(self.fd, FIONREAD, bytes_avail)
        buffer_size = bytes_avail.value
        # Read and parse it:
        data = os.read(self.fd, buffer_size)
        events = parse_events(data)
        return events
serialposix.py 文件源码 项目:Jackal_Velodyne_Duke 作者: MengGuo 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
serialposix.py 文件源码 项目:Jackal_Velodyne_Duke 作者: MengGuo 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
serialposix.py 文件源码 项目:Jackal_Velodyne_Duke 作者: MengGuo 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
serialposix.py 文件源码 项目:Jackal_Velodyne_Duke 作者: MengGuo 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
pyinotify.py 文件源码 项目:hacker-scripts 作者: restran 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def read_events(self):
        """
        Read events from device, build _RawEvents, and enqueue them.
        """
        buf_ = array.array('i', [0])
        # get event queue size
        if fcntl.ioctl(self._fd, termios.FIONREAD, buf_, 1) == -1:
            return
        queue_size = buf_[0]
        if queue_size < self._threshold:
            log.debug('(fd: %d) %d bytes available to read but threshold is '
                      'fixed to %d bytes', self._fd, queue_size,
                      self._threshold)
            return

        try:
            # Read content from file
            r = os.read(self._fd, queue_size)
        except Exception, msg:
            raise NotifierError(msg)
        log.debug('Event queue size: %d', queue_size)
        rsum = 0  # counter
        while rsum < queue_size:
            s_size = 16
            # Retrieve wd, mask, cookie and fname_len
            wd, mask, cookie, fname_len = struct.unpack('iIII',
                                                        r[rsum:rsum+s_size])
            # Retrieve name
            fname, = struct.unpack('%ds' % fname_len,
                                   r[rsum + s_size:rsum + s_size + fname_len])
            rawevent = _RawEvent(wd, mask, cookie, fname)
            if self._coalesce:
                # Only enqueue new (unique) events.
                raweventstr = str(rawevent)
                if raweventstr not in self._eventset:
                    self._eventset.add(raweventstr)
                    self._eventq.append(rawevent)
            else:
                self._eventq.append(rawevent)
            rsum += s_size + fname_len
posix_utils.py 文件源码 项目:vim-clangd 作者: Chilledheart 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def EstimateUnreadBytes(fd):
    buf = array('i', [0])
    ioctl(fd, FIONREAD, buf, 1)
    return buf[0]
serialposix.py 文件源码 项目:ftcommunity-apps 作者: ftCommunity 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def in_waiting(self):
        """Return the number of bytes currently in the input buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]

    # select based implementation, proved to work on many systems
serialposix.py 文件源码 项目:ftcommunity-apps 作者: ftCommunity 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def out_waiting(self):
        """Return the number of bytes currently in the output buffer."""
        #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
        s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
        return struct.unpack('I', s)[0]
pyinotify.py 文件源码 项目:ClassiPi 作者: mugroma3 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def read_events(self):
        """
        Read events from device, build _RawEvents, and enqueue them.
        """
        buf_ = array.array('i', [0])
        # get event queue size
        if fcntl.ioctl(self._fd, termios.FIONREAD, buf_, 1) == -1:
            return
        queue_size = buf_[0]
        if queue_size < self._threshold:
            log.debug('(fd: %d) %d bytes available to read but threshold is '
                      'fixed to %d bytes', self._fd, queue_size,
                      self._threshold)
            return

        try:
            # Read content from file
            r = os.read(self._fd, queue_size)
        except Exception as msg:
            raise NotifierError(msg)
        log.debug('Event queue size: %d', queue_size)
        rsum = 0  # counter
        while rsum < queue_size:
            s_size = 16
            # Retrieve wd, mask, cookie and fname_len
            wd, mask, cookie, fname_len = struct.unpack('iIII',
                                                        r[rsum:rsum+s_size])
            # Retrieve name
            bname, = struct.unpack('%ds' % fname_len,
                                   r[rsum + s_size:rsum + s_size + fname_len])
            # FIXME: should we explictly call sys.getdefaultencoding() here ??
            uname = bname.decode()
            rawevent = _RawEvent(wd, mask, cookie, uname)
            if self._coalesce:
                # Only enqueue new (unique) events.
                raweventstr = str(rawevent)
                if raweventstr not in self._eventset:
                    self._eventset.add(raweventstr)
                    self._eventq.append(rawevent)
            else:
                self._eventq.append(rawevent)
            rsum += s_size + fname_len
pyinotify.py 文件源码 项目:isar 作者: ilbers 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def read_events(self):
        """
        Read events from device, build _RawEvents, and enqueue them.
        """
        buf_ = array.array('i', [0])
        # get event queue size
        if fcntl.ioctl(self._fd, termios.FIONREAD, buf_, 1) == -1:
            return
        queue_size = buf_[0]
        if queue_size < self._threshold:
            log.debug('(fd: %d) %d bytes available to read but threshold is '
                      'fixed to %d bytes', self._fd, queue_size,
                      self._threshold)
            return

        try:
            # Read content from file
            r = os.read(self._fd, queue_size)
        except Exception as msg:
            raise NotifierError(msg)
        log.debug('Event queue size: %d', queue_size)
        rsum = 0  # counter
        while rsum < queue_size:
            s_size = 16
            # Retrieve wd, mask, cookie and fname_len
            wd, mask, cookie, fname_len = struct.unpack('iIII',
                                                        r[rsum:rsum+s_size])
            # Retrieve name
            bname, = struct.unpack('%ds' % fname_len,
                                   r[rsum + s_size:rsum + s_size + fname_len])
            # FIXME: should we explictly call sys.getdefaultencoding() here ??
            uname = bname.decode()
            rawevent = _RawEvent(wd, mask, cookie, uname)
            if self._coalesce:
                # Only enqueue new (unique) events.
                raweventstr = str(rawevent)
                if raweventstr not in self._eventset:
                    self._eventset.add(raweventstr)
                    self._eventq.append(rawevent)
            else:
                self._eventq.append(rawevent)
            rsum += s_size + fname_len


问题


面经


文章

微信
公众号

扫码关注公众号