queue.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:temboard-agent 作者: dalibo 项目源码 文件源码
def get_last_n_messages(self, n):
        """
        Generator intended to return the last n messages from the queue.
        As far as the last records are located at the end of the file, we read
        the file backwards until the number of desired lines is reached or the
        whole file has been read. -1 means no limit.
        """
        buf_size = 8192
        try:
            with open(self.file_path, 'r') as fd:
                fcntl.flock(fd, fcntl.LOCK_SH)
                segment = None
                offset = 0
                n_line = 0
                # Move to the EOF
                fd.seek(0, os.SEEK_END)
                # Get file size using tell()
                file_size = total_size = remaining_size = fd.tell()

                while (remaining_size > 0):
                    offset = min(total_size, offset + buf_size)
                    # Move pointer to the next position.
                    fd.seek(file_size - offset)
                    # Read a chunk into the buffer.
                    buffer = fd.read(min(remaining_size, buf_size))
                    remaining_size -= buf_size
                    # Split buffer content by EOL.
                    lines = buffer.split('\n')

                    if segment is not None:
                        # Case when we need to concatenate the first uncomplete
                        # line of the last loop iter. with the last one of this
                        # current iteration.
                        if buffer[-1] is not '\n':
                            lines[-1] += segment
                        else:
                            n_line += 1
                            if (n > -1 and n_line > n):
                                fcntl.flock(fd, fcntl.LOCK_UN)
                                break
                            yield json.loads(
                                    self.parse_row_message(segment).content)
                    segment = lines[0]
                    # Read each line.
                    for idx in range(len(lines) - 1, 0, -1):
                        if len(lines[idx]):
                            n_line += 1
                            if (n > -1 and n_line > n):
                                fcntl.flock(fd, fcntl.LOCK_UN)
                                break
                            yield json.loads(
                                    self.parse_row_message(lines[idx]).content)

                if segment is not None:
                    yield json.loads(self.parse_row_message(segment).content)
                fcntl.flock(fd, fcntl.LOCK_UN)
        except Exception:
            return
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号