python类SEEK_CUR的实例源码

test_streams.py 文件源码 项目:pyheatshrink 作者: johan-sports 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_seeking_beyond_beginning_from_current(self):
        with EncodedFile(io.BytesIO(COMPRESSED)) as fp:
            self.assertRaises(IOError, fp.seek, -100, io.SEEK_CUR)
streams.py 文件源码 项目:pyheatshrink 作者: johan-sports 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def seek(self, offset, whence=io.SEEK_SET):
        # Recalculate offset as an absolute file position.
        if whence == io.SEEK_SET:
            pass
        elif whence == io.SEEK_CUR:
            offset += self._pos
        elif whence == io.SEEK_END:
            if self._size < 0:
                # Finish reading the file
                while self.read(io.DEFAULT_BUFFER_SIZE):
                    pass
            offset += self._size
        else:
            raise ValueError('Invalid value for whence: {}'.format(whence))

        if offset < 0:
            msg = '[Error {code}] {msg}'
            raise IOError(msg.format(code=errno.EINVAL,
                                     msg=os.strerror(errno.EINVAL)))

        # Make it so that offset is the number of bytes to skip forward.
        if offset < self._pos:
            self._rewind()
        else:
            offset -= self._pos

        # Read and discard data until we reach the desired position
        while offset > 0:
            data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset))
            if not data:
                break
            offset -= len(data)

        return self._pos
tarfile.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
tarfile.py 文件源码 项目:gardenbot 作者: GoestaO 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
tarfile.py 文件源码 项目:projeto 作者: BarmyPenguin 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
tarfile.py 文件源码 项目:flask-zhenai-mongo-echarts 作者: Fretice 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
ble_logger.py 文件源码 项目:jumper-ble-logger 作者: Jumperr-labs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def run(self):
        try:
            while not self._should_stop:
                readable, _, _ = select.select(self._inputs, [], [], 1)

                if self._hci_socket in readable:
                    source = 'socket'
                    packet = self._hci_socket.recv(4096)
                    self._logger.debug('SOCKET: %s', RawCopy(HciPacket).parse(packet))
                    self.handle_packet(packet, source)

                if self._pty_fd in readable:
                    data = os.read(self._pty_master, 4096)
                    self._logger.debug('Raw PTY data: %s', repr(data))
                    self._pty_buffer.write(data)
                    self._pty_buffer.seek(-len(data), SEEK_CUR)

                    source = 'pty'
                    while True:
                        if self._pty_buffer.pos == self._pty_buffer.len:
                            break
                        parsed_packet = RawCopy(HciPacket).parse_stream(self._pty_buffer)
                        if not parsed_packet:
                            break
                        self._logger.debug('PTY: %s', parsed_packet)
                        packet = parsed_packet.data
                        self.handle_packet(packet, source)
        except KeyboardInterrupt:
            log.info("Received SIGTERM, exiting")
tarfile.py 文件源码 项目:aweasome_learning 作者: Knight-ZXW 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
tarfile.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
tarfile.py 文件源码 项目:blog_flask 作者: momantai 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
skeleton.py 文件源码 项目:bfres 作者: Syroot 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load(self, loader: bfres.core.ResFileLoader):
        loader.check_signature(self._SIGNATURE)
        self._flags = loader.read_uint32()
        num_bone = loader.read_uint16()
        num_smooth_matrix = loader.read_uint16()
        num_rigid_matrix = loader.read_uint16()
        loader.seek(2, io.SEEK_CUR)
        self.bones = loader.load_dict(bfres.Bone)
        ofs_bone_list = loader.read_offset()  # Only load dict
        self.matrix_to_bone_list = loader.load_custom(lambda l: l.read_uint16s(num_smooth_matrix + num_rigid_matrix))
        self.inverse_model_matrices = loader.load_custom(lambda l: l.read_matrix3x4s(num_smooth_matrix))
        user_pointer = loader.read_uint32()
vertexbuffer.py 文件源码 项目:bfres 作者: Syroot 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def load(self, loader: bfres.core.ResFileLoader):
        loader.check_signature("FVTX")
        num_vertex_attrib = loader.read_byte()
        num_buffer = loader.read_byte()
        idx = loader.read_uint16()
        vertex_count = loader.read_uint32()
        self.vertex_skin_count = loader.read_byte()
        loader.seek(3, io.SEEK_CUR)
        ofs_vertex_attrib_list = loader.read_offset()  # Only load dict.
        self.attributes = loader.load_dict(bfres.VertexAttrib)
        self.buffers = loader.load_list(bfres.Buffer, num_buffer)
        user_pointer = loader.read_uint32()
mesh.py 文件源码 项目:bfres 作者: Syroot 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def load(self, loader: bfres.core.ResFileLoader):
        self.primitive_type = bfres.gx2.GX2PrimitiveType(loader.read_uint32())
        self.index_format = bfres.gx2.GX2IndexFormat(loader.read_uint32())
        index_count = loader.read_uint32()
        num_sub_mesh = loader.read_uint16()
        loader.seek(2, io.SEEK_CUR)
        self.sub_meshes = loader.load_list(bfres.SubMesh, num_sub_mesh)
        self.index_buffer = loader.load(bfres.Buffer)
        self.first_vertex = loader.read_uint32()
vertexattrib.py 文件源码 项目:bfres 作者: Syroot 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load(self, loader: bfres.core.ResFileLoader):
        self.name = loader.load_string()
        self.buffer_index = loader.read_byte()
        loader.seek(1, io.SEEK_CUR)
        self.offset = loader.read_uint16()
        self.format = bfres.gx2.GX2AttribFormat(loader.read_uint32())
renderinfo.py 文件源码 项目:bfres 作者: Syroot 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load(self, loader: bfres.core.ResFileLoader):
        count = loader.read_uint16()
        self.value_type = RenderInfoType(loader.read_byte())
        loader.seek(1, io.SEEK_CUR)
        self.name = loader.load_string()
        if self.value_type == RenderInfoType.INT32:
            self.value = loader.read_int32s(count)
        elif self.value_type == RenderInfoType.SINGLE:
            self.value = loader.read_singles(count)
        elif self.value_type == RenderInfoType.STRING:
            self.value = loader.load_strings(count)
texture.py 文件源码 项目:bfres 作者: Syroot 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load(self, loader: bfres.core.ResFileLoader):
        loader.check_signature("FTEX")
        self.dim = bfres.gx2.GX2SurfaceDim(loader.read_uint32())
        self.width = loader.read_uint32()
        self.height = loader.read_uint32()
        self.depth = loader.read_uint32()
        self.mip_count = loader.read_uint32()
        self.format = bfres.gx2.GX2SurfaceFormat(loader.read_uint32())
        self.aa_mode = bfres.gx2.GX2AAMode(loader.read_uint32())
        self.use = bfres.gx2.GX2SurfaceUse(loader.read_uint32())
        siz_data = loader.read_uint32()
        image_pointer = loader.read_uint32()
        siz_map_data = loader.read_uint32()
        mip_pointer = loader.read_uint32()
        self.tile_mode = bfres.gx2.GX2TileMode(loader.read_uint32())
        self.swizzle = loader.read_uint32()
        self.alignment = loader.read_uint32()
        self.pitch = loader.read_uint32()
        self.mip_offsets = loader.read_uint32s(13)
        self.view_mip_first = loader.read_uint32()
        self.view_mip_count = loader.read_uint32()
        self.view_slice_first = loader.read_uint32()
        self.view_slice_count = loader.read_uint32()
        self.comp_sel_r = bfres.gx2.GX2CompSel(loader.read_byte())
        self.comp_sel_g = bfres.gx2.GX2CompSel(loader.read_byte())
        self.comp_sel_b = bfres.gx2.GX2CompSel(loader.read_byte())
        self.comp_sel_a = bfres.gx2.GX2CompSel(loader.read_byte())
        self.regs = loader.read_uint32s(5)
        handle = loader.read_uint32()
        self.array_length = loader.read_uint32()  # Possibly just a byte.
        self.name = loader.load_string()
        self.path = loader.load_string()
        self.data = loader.load_custom(lambda l: l.read_bytes(siz_data))
        self.mip_data = loader.load_custom(lambda l: l.read_bytes(siz_map_data))
        self.user_data = loader.load_dict(bfres.UserData)
        num_user_data = loader.read_uint16()
        loader.seek(2, io.SEEK_CUR)
userdata.py 文件源码 项目:bfres 作者: Syroot 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def load(self, loader: bfres.core.ResFileLoader):
        self.name = loader.load_string()
        count = loader.read_uint16()
        self.value_type = UserDataType(loader.read_byte())
        loader.seek(1, io.SEEK_CUR)
        if self.value_type == UserDataType.INT32:
            self.value = loader.read_int32s(count)
        elif self.value_type == UserDataType.SINGLE:
            self.value = loader.read_singles(count)
        elif self.value_type == UserDataType.STRING:
            self.value = loader.load_strings(count, "ascii")
        elif self.value_type == UserDataType.WSTRING:
            self.value = loader.load_strings(count, "utf-8")
        elif self.value_type == UserDataType.BYTE:
            self.value = loader.read_bytes(count)
tarfile.py 文件源码 项目:MyFriend-Rob 作者: lcheniv 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
PacketFramingStream.py 文件源码 项目:Playground3 作者: CrimsonVista 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def seek(self, offset, whence=SEEK_SET):
        "Adjust seek from prefix start and, if present, from prefix"
        if not self._rawStreamSize() >= (self.PREFIX_SIZE + self.SUFFIX_SIZE):
            return
        if whence == SEEK_SET:
            offset += self._prefixStart + self.PREFIX_SIZE
            return self._stream.seek(offset, whence)
        elif whence == SEEK_CUR:
            return self._stream.seek(offset, whence)
        elif whence == SEEK_END:
            # even if the suffix hasn't been received yet, we calculate our offsets as if it had.
            # why? because if it hasn't been received yet, we don't want to finish! The whole packet
            # isn't framed (verified) until the final bytes are received.
            offset = offset - self.SUFFIX_SIZE
            return self._stream.seek(offset, whence)
types.py 文件源码 项目:pixelinkds 作者: hgrecco 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def from_file(cls, fp, size=None):
        """Construct a Frame Descriptor object from binary data.

        Parameters
        ----------
        fp : file-like
             File-like object to read the Frame Descriptor from.
             The cursor must be at the position where the Frame Descriptor starts.

        size : int, optional
               the number of bytes to be read.
               If not given, the value inspected from the fp itself will be used.

        Returns
        -------
        FrameDescriptor

        .. note:: A number of bytes equal to `size` will be consumed from the fp.

        .. warning:
                  If the number of requested bytes is smaller than the size of the
                  Frame Descriptor, some fields might be left unfilled.


        """
        out = FrameDescriptor()

        if size is None:
            size = np.fromfile(fp, dtype=np.uint32, count=1)
            if size.size == 0:
                raise ValueError('Unexpected EOF')

            fp.seek(-size.nbytes, io.SEEK_CUR)

            size = size[0]

        if size > ct.sizeof(FrameDescriptor):
            raise ValueError('Number of requested bytes (%d) do not fit '
                             'in the data structure (%d)' % (size, ct.sizeof(out)))

        buf = fp.read(size)

        ct.memmove(ct.addressof(out), buf, size)

        return out


问题


面经


文章

微信
公众号

扫码关注公众号