def refresh_contents(self):
log_file = open(self.log_path)
log_file.seek(0, io.SEEK_END)
end_pos = log_file.tell()
if end_pos > 20000:
log_file.seek(end_pos - 20000, io.SEEK_SET)
else:
log_file.seek(0, io.SEEK_SET)
contents = log_file.read().split("\n", 1)[-1]
if contents != self.contents:
self.contents = contents
self.textBrowser_LogContent.clear()
self.textBrowser_LogContent.setPlainText(contents)
# Scrolling to bottom
cursor = self.textBrowser_LogContent.textCursor()
cursor.movePosition(QTextCursor.End)
self.textBrowser_LogContent.setTextCursor(cursor)
self.textBrowser_LogContent.ensureCursorVisible()
log_file.close()
python类SEEK_SET的实例源码
def seek(self, offset, whence=io.SEEK_SET):
"""Change the file position.
The new position is specified by offset, relative to the
position indicated by whence. Values for whence are:
0: start of stream (default); offset must not be negative
1: current stream position
2: end of stream; offset must not be positive
Returns the new file position.
Note that seeking is emulated, so depending on the parameters,
this operation may be extremely slow.
"""
with self._lock:
self._check_can_seek()
return self._buffer.seek(offset, whence)
def __init__(self, file_object, start, size):
"""
Build an object that will act like a BufferedReader object,
but constrained to a predetermined part of the file_object.
:param file_object: A file opened for reading in binary mode.
:param start: Start of the part within the file_object
:param size: Ideal size of the part. This will be set
to the number of bytes remaining in the
file if there aren't enough bytes remaining.
"""
self.file = file_object
self.bytes_read = 0
self.start = start
# Seek to the end of the file to see how many bytes remain
# from the start of the part.
self.file.seek(0, io.SEEK_END)
self.size = min(self.file.tell() - self.start, size)
# Reset the pointer to the start of the part.
self.file.seek(start, io.SEEK_SET)
def seek(self, offset, whence=io.SEEK_SET):
self.__raiseIfClosed()
if not self.__seekable:
raise OSError("Seek not enabled for this stream")
if whence == io.SEEK_SET:
if offset < 0:
raise ValueError("Cannot have a negative absolute seek")
newStreamPosition = offset
elif whence == io.SEEK_CUR:
newStreamPosition = self.__streamPosition + whence
elif whence == io.SEEK_END:
if not self.__buffers:
newStreamPosition = 0
else:
newStreamPosition = self.__streamEnd + offset
self.__streamPosition = newStreamPosition
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def get_mime(self, media_io):
if hasattr(media_io, "getvalue"):
buffer = media_io.getvalue()
elif hasattr(media_io, "seekable") and media_io.seekable():
buffer = media_io.read(1024)
media_io.seek(SEEK_SET)
else:
raise DontwiMediaError
return self.magic.from_buffer(buffer)
def get_media_size(media_io):
if hasattr(media_io, "getvalue"):
size = len(media_io.getvalue())
elif hasattr(media_io, "seekable") and media_io.seekable():
media_io.seek(0, SEEK_END)
size = media_io.tell()
media_io.seek(SEEK_SET)
else:
raise DontwiMediaError
return size
def reset(self):
"""Move the cursor the beginning of the 1st frame descriptor.
"""
self._fin.seek(self._offset, io.SEEK_SET)
def seek(self, offset, whence=io.SEEK_SET):
self.reader.seek(offset, whence)
def seek(self, offset, whence=io.SEEK_SET):
self.writer.seek(offset, whence)
def get_full_trace(self, start_idx, end_idx):
"""Returns the Tor DATA cells transmitted over a circuit during a
specified time period."""
# Sanity check
assert start_idx >= 0 and end_idx > 0, ("Invalid (negative) logfile "
"position")
assert end_idx > start_idx, ("logfile section end_idx must come "
"after start_idx")
self.cell_log.seek(start_idx, SEEK_SET)
return self.cell_log.read(end_idx - start_idx)
def seek(self, offset, whence=io.SEEK_SET):
self.reader.seek(offset, whence)
def seek(self, offset, whence=io.SEEK_SET):
self.writer.seek(offset, whence)
def _generate_image():
from PIL import Image
thumb = Image.new('RGB', (100, 100), 'blue')
thumb_io = io.BytesIO()
thumb.save(thumb_io, format='PNG')
thumb_io.seek(0, io.SEEK_SET)
return SimpleUploadedFile("image.png", thumb_io.getbuffer(), content_type="image/png")
def __getitem__(self, name):
# Retrieve value of an input pin.
pin = self.pins[name]
if pin["type"] != "in":
raise KeyError(f"Not an input: {name}")
pin["file"].seek(0, io.SEEK_SET)
return pin["file"].read(1) == "1"
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def web(self, path, args={}, encoding=None, allow_return_none=False):
self.logger.debug('????????')
if allow_return_none:
if path in self.web_cache and self.web_cache[path] == args:
self.logger.debug('????? {} ????'.format(path))
self.logger.debug('???{}'.format(args))
return
self.web_cache[path] = dict(args)
url = urllib.parse.urljoin(self.web_url, urllib.parse.quote(path))
if len(args) > 0:
url += '?' + urllib.parse.urlencode(args)
self.logger.debug('HTTP ?????{}'.format(url))
data = io.BytesIO()
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.COOKIE, self.web_cookie)
self.curl.setopt(pycurl.NOBODY, False)
self.curl.setopt(pycurl.NOPROGRESS, True)
self.curl.setopt(pycurl.WRITEDATA, data)
self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None)
self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None)
self.curl.perform()
status = self.curl.getinfo(pycurl.RESPONSE_CODE)
if status != 200:
raise ServerError(status)
data.seek(io.SEEK_SET)
return etree.parse(data, etree.HTMLParser(
encoding=encoding, remove_comments=True))
def recv(self, n):
self._buffer.seek(self._read_pos, io.SEEK_SET)
bs = self._buffer.read(n)
self._read_pos = self._buffer.tell()
return bs
def seek(self, offset, whence=io.SEEK_SET):
self.reader.seek(offset, whence)
def seek(self, offset, whence=io.SEEK_SET):
self.writer.seek(offset, whence)
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, offset, whence):
if (whence == io.SEEK_SET):
self._pos = offset
elif (whence == io.SEEK_CUR):
self._pos += offset
elif (whence == io.SEEK_END):
self._pos = self._file._filesize + offset
return self._pos
def test_small_file_seek_and_read(self):
with self.fs.open(self.KEY_LOGO_PNG, "rb") as f:
self.assertEqual(64, f.seek(64, io.SEEK_CUR))
self.assertEqual(128, f.seek(64, io.SEEK_CUR))
self.assertEqual(256, f.seek(256, io.SEEK_SET))
self.assertEqual(24610, f.seek(-256, io.SEEK_END))
self.assertEqual(
b'\x04$\x00_\x85$\xfb^\xf8\xe8]\x7f;}\xa8\xb7',
f.read(16))
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, offset, whence=SEEK_SET):
return self._f.seek(offset, whence)
def seek(self, offset, whence=io.SEEK_SET):
"""
Change the stream position to the given byte *offset*. *offset* is
interpreted relative to the position indicated by *whence*. Values for
*whence* are:
* ``SEEK_SET`` or ``0`` – start of the stream (the default); *offset*
should be zero or positive
* ``SEEK_CUR`` or ``1`` – current stream position; *offset* may be
negative
* ``SEEK_END`` or ``2`` – end of the stream; *offset* is usually
negative
Return the new absolute position.
"""
with self.lock:
if whence == io.SEEK_CUR:
offset = self._pos + offset
elif whence == io.SEEK_END:
offset = self._length + offset
if offset < 0:
raise ValueError(
'New position is before the start of the stream')
self._set_pos(offset)
return self._pos
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, offset, whence=io.SEEK_SET):
# Recalculate offset as an absolute file position.
if whence == io.SEEK_SET:
pass
elif whence == io.SEEK_CUR:
offset += self._pos
elif whence == io.SEEK_END:
if self._size < 0:
# Finish reading the file
while self.read(io.DEFAULT_BUFFER_SIZE):
pass
offset += self._size
else:
raise ValueError('Invalid value for whence: {}'.format(whence))
if offset < 0:
msg = '[Error {code}] {msg}'
raise IOError(msg.format(code=errno.EINVAL,
msg=os.strerror(errno.EINVAL)))
# Make it so that offset is the number of bytes to skip forward.
if offset < self._pos:
self._rewind()
else:
offset -= self._pos
# Read and discard data until we reach the desired position
while offset > 0:
data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset))
if not data:
break
offset -= len(data)
return self._pos