def get_last_n_messages(self, n):
"""
Generator intended to return the last n messages from the queue.
As far as the last records are located at the end of the file, we read
the file backwards until the number of desired lines is reached or the
whole file has been read. -1 means no limit.
"""
buf_size = 8192
try:
with open(self.file_path, 'r') as fd:
fcntl.flock(fd, fcntl.LOCK_SH)
segment = None
offset = 0
n_line = 0
# Move to the EOF
fd.seek(0, os.SEEK_END)
# Get file size using tell()
file_size = total_size = remaining_size = fd.tell()
while (remaining_size > 0):
offset = min(total_size, offset + buf_size)
# Move pointer to the next position.
fd.seek(file_size - offset)
# Read a chunk into the buffer.
buffer = fd.read(min(remaining_size, buf_size))
remaining_size -= buf_size
# Split buffer content by EOL.
lines = buffer.split('\n')
if segment is not None:
# Case when we need to concatenate the first uncomplete
# line of the last loop iter. with the last one of this
# current iteration.
if buffer[-1] is not '\n':
lines[-1] += segment
else:
n_line += 1
if (n > -1 and n_line > n):
fcntl.flock(fd, fcntl.LOCK_UN)
break
yield json.loads(
self.parse_row_message(segment).content)
segment = lines[0]
# Read each line.
for idx in range(len(lines) - 1, 0, -1):
if len(lines[idx]):
n_line += 1
if (n > -1 and n_line > n):
fcntl.flock(fd, fcntl.LOCK_UN)
break
yield json.loads(
self.parse_row_message(lines[idx]).content)
if segment is not None:
yield json.loads(self.parse_row_message(segment).content)
fcntl.flock(fd, fcntl.LOCK_UN)
except Exception:
return
评论列表
文章目录