def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
python类SEEK_SET的实例源码
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, offset, whence=io.SEEK_SET):
if self.in_buffer:
return self.in_buffer.seek(offset, whence)
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def recognizes(cls, file):
size = os.stat(file.path).st_size
if size < CBFS_HEADER_SIZE or size > CBFS_MAXIMUM_FILE_SIZE:
return False
with open(file.path, 'rb') as f:
# pick at the latest byte as it should contain the relative offset of the header
f.seek(-4, io.SEEK_END)
# <pgeorgi> given the hardware we support so far, it looks like
# that field is now bound to be little endian
# -- #coreboot, 2015-10-14
rel_offset = struct.unpack('<i', f.read(4))[0]
if rel_offset < 0 and -rel_offset > CBFS_HEADER_SIZE and -rel_offset < size:
f.seek(rel_offset, io.SEEK_END)
logger.debug('looking for header at offset: %x', f.tell())
if is_header_valid(f.read(CBFS_HEADER_SIZE), size):
return True
elif not file.name.endswith('.rom'):
return False
else:
logger.debug('CBFS relative offset seems wrong, scanning whole image')
f.seek(0, io.SEEK_SET)
offset = 0
buf = f.read(CBFS_HEADER_SIZE)
while len(buf) >= CBFS_HEADER_SIZE:
if is_header_valid(buf, size, offset):
return True
if len(buf) - offset <= CBFS_HEADER_SIZE:
buf = f.read(32768)
offset = 0
else:
offset += 1
return False
def add_sample(self, sample, end_callback=None):
assert sample.samplewidth == self.samplewidth
assert sample.samplerate == self.samplerate
assert sample.nchannels == self.nchannels
stream = io.BytesIO()
sample.write_wav(stream)
stream.seek(0, io.SEEK_SET)
self.add_stream(stream, end_callback=end_callback)
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def load(self, value_type: type):
"""
Reads and returns an instance of the value_type from the following offset or returns None if the read offset is
0.
:param value_type: The type of the instance to read.
:return: The instance or None.
"""
offset = self.read_offset()
if not offset:
return None
with self.temporary_seek(offset, io.SEEK_SET):
return self._read_res_data(value_type)
def load_custom(self, callback, offset: int = None):
"""
Reads and returns an instance of arbitrary type from the following offset with the given callback or returns
None if the read offset is 0.
:param callback: The callback to read the instance data with.
:param offset: The optional offset to use instead of reading a following one.
:return: The data instance or None.
"""
offset = offset or self.read_offset()
if not offset:
return None
with self.temporary_seek(offset, io.SEEK_SET):
return callback(self)
def load_dict(self, value_type: type):
"""
Reads and returns a ResDict instance with elements of the given value_type from the following offset or returns
an empty instance if the read offset is 0.
:param value_type: The type of the elements.
:return: The ResDict instance.
"""
offset = self.read_offset()
resdict = bfres.ResDict(value_type)
if not offset:
return resdict
with self.temporary_seek(offset, io.SEEK_SET):
resdict.load(self)
return resdict
def load_list(self, value_type: type, count: int, offset: int = None):
items = []
offset = offset or self.read_offset()
if not offset or not count:
return items
# Seek to the list start and read it.
with self.temporary_seek(offset, io.SEEK_SET):
for i in range(count):
items.append(self._read_res_data(value_type))
return items
def load_string(self, encoding: str = None):
"""
Reads and returns a str instance from the following offset or None if the read offset is 0.
:return: The read text.
"""
offset = self.read_offset()
if not offset:
return None
with self.temporary_seek(offset, io.SEEK_SET):
return self.read_string_0(encoding or self.encoding)
def seek(self, offset, whence):
"""
Seek within the part. This is similar to the standard seek, except
that io.SEEK_SET is the start of the part and io.SEEK_END is the
end of the part.
:param offset: Offset in bytes from location determined by the whence value
:param whence: io.SEEK_END and io.SEEK_SET are supported.
"""
if whence == io.SEEK_END:
self.file.seek(self.start + self.size + offset, io.SEEK_SET)
elif whence == io.SEEK_SET:
self.file.seek(self.start + offset, io.SEEK_SET)
else:
raise RuntimeError("Unhandled whence value: {}".format(whence))
def add_parts_from_file(self, file_path):
"""
Splits a file into parts and adds all parts to an internal list of parts to upload. The parts will not be uploaded to the server until upload is called.
:param string file_path: Path of file to upload in parts
"""
with io.open(file_path, mode='rb') as file_object:
file_object.seek(0, io.SEEK_END)
end = file_object.tell()
file_object.seek(0, io.SEEK_SET)
offset = 0
while file_object.tell() < end:
self.add_part_from_file(file_path, offset=offset, size=self.part_size)
offset += self.part_size
file_object.seek(offset, io.SEEK_SET)
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, offset, whence=SEEK_SET):
"Adjust seek from prefix start and, if present, from prefix"
if not self._rawStreamSize() >= (self.PREFIX_SIZE + self.SUFFIX_SIZE):
return
if whence == SEEK_SET:
offset += self._prefixStart + self.PREFIX_SIZE
return self._stream.seek(offset, whence)
elif whence == SEEK_CUR:
return self._stream.seek(offset, whence)
elif whence == SEEK_END:
# even if the suffix hasn't been received yet, we calculate our offsets as if it had.
# why? because if it hasn't been received yet, we don't want to finish! The whole packet
# isn't framed (verified) until the final bytes are received.
offset = offset - self.SUFFIX_SIZE
return self._stream.seek(offset, whence)
def importMesh(self, stream, filename=None):
assert(issubclass(type(stream),IOBase));
#Check mesh name validity
if (filename is None):
if (hasattr(stream,'name')):
filename = stream.name;
elif (hasattr(stream, 'filename')):
filename = stream.filename;
else:
raise ValueError("Cannot determine the filename of the stream please add filename parameter")
filename = os.path.basename(filename);
mesh_name = os.path.splitext(filename)[0];
if mesh_name in bpy.data.meshes.keys():
raise ValueError("Mesh with name " + mesh_name + " already exists in blender");
#Check header and select impl
self._determineEndianness(stream);
headerID = self._readUShorts(stream,1)[0];
if (headerID != OgreMeshSerializer.HEADER_CHUNK_ID):
raise ValueError("File header not found");
ver = OgreSerializer.readString(stream);
stream.seek(0,SEEK_SET);
impl = None;
for i in self._versionData:
if (i.versionString == ver):
impl = i.impl;
break;
if (impl is None):
print(ver);
raise ValueError("Cannot find serializer implementation for "
"mesh version " + ver);
#Create the blender mesh and import the mesh
mesh = OgreMesh(mesh_name);
impl.importMesh(stream,mesh,self.listener);
#Check if newer version
if (ver != self._versionData[0].versionString):
print("Warning: "
" older format (" + ver + "); you should upgrade it as soon as possible" +
" using the OgreMeshUpgrade tool.");
#Probably useless
if (self.listener is not None):
listener.processMeshCompleted(mesh);