def test_seeking_beyond_beginning_from_current(self):
with EncodedFile(io.BytesIO(COMPRESSED)) as fp:
self.assertRaises(IOError, fp.seek, -100, io.SEEK_CUR)
python类SEEK_CUR的实例源码
def seek(self, offset, whence=io.SEEK_SET):
# Recalculate offset as an absolute file position.
if whence == io.SEEK_SET:
pass
elif whence == io.SEEK_CUR:
offset += self._pos
elif whence == io.SEEK_END:
if self._size < 0:
# Finish reading the file
while self.read(io.DEFAULT_BUFFER_SIZE):
pass
offset += self._size
else:
raise ValueError('Invalid value for whence: {}'.format(whence))
if offset < 0:
msg = '[Error {code}] {msg}'
raise IOError(msg.format(code=errno.EINVAL,
msg=os.strerror(errno.EINVAL)))
# Make it so that offset is the number of bytes to skip forward.
if offset < self._pos:
self._rewind()
else:
offset -= self._pos
# Read and discard data until we reach the desired position
while offset > 0:
data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset))
if not data:
break
offset -= len(data)
return self._pos
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def run(self):
try:
while not self._should_stop:
readable, _, _ = select.select(self._inputs, [], [], 1)
if self._hci_socket in readable:
source = 'socket'
packet = self._hci_socket.recv(4096)
self._logger.debug('SOCKET: %s', RawCopy(HciPacket).parse(packet))
self.handle_packet(packet, source)
if self._pty_fd in readable:
data = os.read(self._pty_master, 4096)
self._logger.debug('Raw PTY data: %s', repr(data))
self._pty_buffer.write(data)
self._pty_buffer.seek(-len(data), SEEK_CUR)
source = 'pty'
while True:
if self._pty_buffer.pos == self._pty_buffer.len:
break
parsed_packet = RawCopy(HciPacket).parse_stream(self._pty_buffer)
if not parsed_packet:
break
self._logger.debug('PTY: %s', parsed_packet)
packet = parsed_packet.data
self.handle_packet(packet, source)
except KeyboardInterrupt:
log.info("Received SIGTERM, exiting")
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def load(self, loader: bfres.core.ResFileLoader):
loader.check_signature(self._SIGNATURE)
self._flags = loader.read_uint32()
num_bone = loader.read_uint16()
num_smooth_matrix = loader.read_uint16()
num_rigid_matrix = loader.read_uint16()
loader.seek(2, io.SEEK_CUR)
self.bones = loader.load_dict(bfres.Bone)
ofs_bone_list = loader.read_offset() # Only load dict
self.matrix_to_bone_list = loader.load_custom(lambda l: l.read_uint16s(num_smooth_matrix + num_rigid_matrix))
self.inverse_model_matrices = loader.load_custom(lambda l: l.read_matrix3x4s(num_smooth_matrix))
user_pointer = loader.read_uint32()
def load(self, loader: bfres.core.ResFileLoader):
loader.check_signature("FVTX")
num_vertex_attrib = loader.read_byte()
num_buffer = loader.read_byte()
idx = loader.read_uint16()
vertex_count = loader.read_uint32()
self.vertex_skin_count = loader.read_byte()
loader.seek(3, io.SEEK_CUR)
ofs_vertex_attrib_list = loader.read_offset() # Only load dict.
self.attributes = loader.load_dict(bfres.VertexAttrib)
self.buffers = loader.load_list(bfres.Buffer, num_buffer)
user_pointer = loader.read_uint32()
def load(self, loader: bfres.core.ResFileLoader):
self.primitive_type = bfres.gx2.GX2PrimitiveType(loader.read_uint32())
self.index_format = bfres.gx2.GX2IndexFormat(loader.read_uint32())
index_count = loader.read_uint32()
num_sub_mesh = loader.read_uint16()
loader.seek(2, io.SEEK_CUR)
self.sub_meshes = loader.load_list(bfres.SubMesh, num_sub_mesh)
self.index_buffer = loader.load(bfres.Buffer)
self.first_vertex = loader.read_uint32()
def load(self, loader: bfres.core.ResFileLoader):
self.name = loader.load_string()
self.buffer_index = loader.read_byte()
loader.seek(1, io.SEEK_CUR)
self.offset = loader.read_uint16()
self.format = bfres.gx2.GX2AttribFormat(loader.read_uint32())
def load(self, loader: bfres.core.ResFileLoader):
count = loader.read_uint16()
self.value_type = RenderInfoType(loader.read_byte())
loader.seek(1, io.SEEK_CUR)
self.name = loader.load_string()
if self.value_type == RenderInfoType.INT32:
self.value = loader.read_int32s(count)
elif self.value_type == RenderInfoType.SINGLE:
self.value = loader.read_singles(count)
elif self.value_type == RenderInfoType.STRING:
self.value = loader.load_strings(count)
def load(self, loader: bfres.core.ResFileLoader):
loader.check_signature("FTEX")
self.dim = bfres.gx2.GX2SurfaceDim(loader.read_uint32())
self.width = loader.read_uint32()
self.height = loader.read_uint32()
self.depth = loader.read_uint32()
self.mip_count = loader.read_uint32()
self.format = bfres.gx2.GX2SurfaceFormat(loader.read_uint32())
self.aa_mode = bfres.gx2.GX2AAMode(loader.read_uint32())
self.use = bfres.gx2.GX2SurfaceUse(loader.read_uint32())
siz_data = loader.read_uint32()
image_pointer = loader.read_uint32()
siz_map_data = loader.read_uint32()
mip_pointer = loader.read_uint32()
self.tile_mode = bfres.gx2.GX2TileMode(loader.read_uint32())
self.swizzle = loader.read_uint32()
self.alignment = loader.read_uint32()
self.pitch = loader.read_uint32()
self.mip_offsets = loader.read_uint32s(13)
self.view_mip_first = loader.read_uint32()
self.view_mip_count = loader.read_uint32()
self.view_slice_first = loader.read_uint32()
self.view_slice_count = loader.read_uint32()
self.comp_sel_r = bfres.gx2.GX2CompSel(loader.read_byte())
self.comp_sel_g = bfres.gx2.GX2CompSel(loader.read_byte())
self.comp_sel_b = bfres.gx2.GX2CompSel(loader.read_byte())
self.comp_sel_a = bfres.gx2.GX2CompSel(loader.read_byte())
self.regs = loader.read_uint32s(5)
handle = loader.read_uint32()
self.array_length = loader.read_uint32() # Possibly just a byte.
self.name = loader.load_string()
self.path = loader.load_string()
self.data = loader.load_custom(lambda l: l.read_bytes(siz_data))
self.mip_data = loader.load_custom(lambda l: l.read_bytes(siz_map_data))
self.user_data = loader.load_dict(bfres.UserData)
num_user_data = loader.read_uint16()
loader.seek(2, io.SEEK_CUR)
def load(self, loader: bfres.core.ResFileLoader):
self.name = loader.load_string()
count = loader.read_uint16()
self.value_type = UserDataType(loader.read_byte())
loader.seek(1, io.SEEK_CUR)
if self.value_type == UserDataType.INT32:
self.value = loader.read_int32s(count)
elif self.value_type == UserDataType.SINGLE:
self.value = loader.read_singles(count)
elif self.value_type == UserDataType.STRING:
self.value = loader.load_strings(count, "ascii")
elif self.value_type == UserDataType.WSTRING:
self.value = loader.load_strings(count, "utf-8")
elif self.value_type == UserDataType.BYTE:
self.value = loader.read_bytes(count)
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, offset, whence=SEEK_SET):
"Adjust seek from prefix start and, if present, from prefix"
if not self._rawStreamSize() >= (self.PREFIX_SIZE + self.SUFFIX_SIZE):
return
if whence == SEEK_SET:
offset += self._prefixStart + self.PREFIX_SIZE
return self._stream.seek(offset, whence)
elif whence == SEEK_CUR:
return self._stream.seek(offset, whence)
elif whence == SEEK_END:
# even if the suffix hasn't been received yet, we calculate our offsets as if it had.
# why? because if it hasn't been received yet, we don't want to finish! The whole packet
# isn't framed (verified) until the final bytes are received.
offset = offset - self.SUFFIX_SIZE
return self._stream.seek(offset, whence)
def from_file(cls, fp, size=None):
"""Construct a Frame Descriptor object from binary data.
Parameters
----------
fp : file-like
File-like object to read the Frame Descriptor from.
The cursor must be at the position where the Frame Descriptor starts.
size : int, optional
the number of bytes to be read.
If not given, the value inspected from the fp itself will be used.
Returns
-------
FrameDescriptor
.. note:: A number of bytes equal to `size` will be consumed from the fp.
.. warning:
If the number of requested bytes is smaller than the size of the
Frame Descriptor, some fields might be left unfilled.
"""
out = FrameDescriptor()
if size is None:
size = np.fromfile(fp, dtype=np.uint32, count=1)
if size.size == 0:
raise ValueError('Unexpected EOF')
fp.seek(-size.nbytes, io.SEEK_CUR)
size = size[0]
if size > ct.sizeof(FrameDescriptor):
raise ValueError('Number of requested bytes (%d) do not fit '
'in the data structure (%d)' % (size, ct.sizeof(out)))
buf = fp.read(size)
ct.memmove(ct.addressof(out), buf, size)
return out