python类SEEK_CUR的实例源码

socks.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def recvfrom(self, bufsize, flags=0):
        if self.type != socket.SOCK_DGRAM:
            return _BaseSocket.recvfrom(self, bufsize, flags)
        if not self._proxyconn:
            self.bind(("", 0))

        buf = BytesIO(_BaseSocket.recv(self, bufsize, flags))
        buf.seek(+2, SEEK_CUR)
        frag = buf.read(1)
        if ord(frag):
            raise NotImplementedError("Received UDP packet fragment")
        fromhost, fromport = self._read_SOCKS5_address(buf)

        if self.proxy_peername:
            peerhost, peerport = self.proxy_peername
            if fromhost != peerhost or peerport not in (0, fromport):
                raise socket.error(EAGAIN, "Packet filtered")

        return (buf.read(), (fromhost, fromport))
socks.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def recvfrom(self, bufsize, flags=0):
        if self.type != socket.SOCK_DGRAM:
            return _BaseSocket.recvfrom(self, bufsize, flags)
        if not self._proxyconn:
            self.bind(("", 0))

        buf = BytesIO(_BaseSocket.recv(self, bufsize, flags))
        buf.seek(+2, SEEK_CUR)
        frag = buf.read(1)
        if ord(frag):
            raise NotImplementedError("Received UDP packet fragment")
        fromhost, fromport = self._read_SOCKS5_address(buf)

        if self.proxy_peername:
            peerhost, peerport = self.proxy_peername
            if fromhost != peerhost or peerport not in (0, fromport):
                raise socket.error(EAGAIN, "Packet filtered")

        return (buf.read(), (fromhost, fromport))
socks.py 文件源码 项目:xxNet 作者: drzorm 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def recvfrom(self, bufsize, flags=0):
        if self.type != socket.SOCK_DGRAM:
            return _BaseSocket.recvfrom(self, bufsize, flags)
        if not self._proxyconn:
            self.bind(("", 0))

        buf = BytesIO(_BaseSocket.recv(self, bufsize, flags))
        buf.seek(+2, SEEK_CUR)
        frag = buf.read(1)
        if ord(frag):
            raise NotImplementedError("Received UDP packet fragment")
        fromhost, fromport = self._read_SOCKS5_address(buf)

        if self.proxy_peername:
            peerhost, peerport = self.proxy_peername
            if fromhost != peerhost or peerport not in (0, fromport):
                raise socket.error(EAGAIN, "Packet filtered")

        return (buf.read(), (fromhost, fromport))
socks.py 文件源码 项目:Docker-XX-Net 作者: kuanghy 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def recvfrom(self, bufsize, flags=0):
        if self.type != socket.SOCK_DGRAM:
            return _BaseSocket.recvfrom(self, bufsize, flags)
        if not self._proxyconn:
            self.bind(("", 0))

        buf = BytesIO(_BaseSocket.recv(self, bufsize, flags))
        buf.seek(+2, SEEK_CUR)
        frag = buf.read(1)
        if ord(frag):
            raise NotImplementedError("Received UDP packet fragment")
        fromhost, fromport = self._read_SOCKS5_address(buf)

        if self.proxy_peername:
            peerhost, peerport = self.proxy_peername
            if fromhost != peerhost or peerport not in (0, fromport):
                raise socket.error(EAGAIN, "Packet filtered")

        return (buf.read(), (fromhost, fromport))
shaderparam.py 文件源码 项目:bfres 作者: Syroot 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def load(self, loader: bfres.core.ResFileLoader):
        self.value_type = ShaderParamType(loader.read_byte())
        if loader.res_file.version >= 0x03030000:
            siz_data = loader.read_byte()
            self.data_offset = loader.read_uint16()
            offset = loader.read_int32()  # Uniform variable offset.
            callback_pointer = loader.read_uint32()
            self.depended_index = loader.read_uint16()
            self.depend_index = loader.read_uint16()
            self.name = loader.load_string()
        else:
            # GUESS
            loader.seek(1, io.SEEK_CUR)
            self.data_offset = loader.read_uint16()
            offset = loader.read_int32()  # Uniform variable offset.
            self.name = loader.load_string()
HighPerformanceStreamIO.py 文件源码 项目:Playground3 作者: CrimsonVista 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def seek(self, offset, whence=io.SEEK_SET):
        self.__raiseIfClosed()
        if not self.__seekable:
            raise OSError("Seek not enabled for this stream")
        if whence == io.SEEK_SET:
            if offset < 0:
                raise ValueError("Cannot have a negative absolute seek")
            newStreamPosition = offset
        elif whence == io.SEEK_CUR:
            newStreamPosition = self.__streamPosition + whence
        elif whence == io.SEEK_END:
            if not self.__buffers:
                newStreamPosition = 0
            else:
                newStreamPosition = self.__streamEnd + offset
        self.__streamPosition = newStreamPosition
socks.py 文件源码 项目:XX-Net-Mini 作者: GFWParty 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def recvfrom(self, bufsize, flags=0):
        if self.type != socket.SOCK_DGRAM:
            return _BaseSocket.recvfrom(self, bufsize, flags)
        if not self._proxyconn:
            self.bind(("", 0))

        buf = BytesIO(_BaseSocket.recv(self, bufsize, flags))
        buf.seek(+2, SEEK_CUR)
        frag = buf.read(1)
        if ord(frag):
            raise NotImplementedError("Received UDP packet fragment")
        fromhost, fromport = self._read_SOCKS5_address(buf)

        if self.proxy_peername:
            peerhost, peerport = self.proxy_peername
            if fromhost != peerhost or peerport not in (0, fromport):
                raise socket.error(EAGAIN, "Packet filtered")

        return (buf.read(), (fromhost, fromport))
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
gbxparser.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def seek(self, offset, whence=io.SEEK_CUR):
        return self.buffer.seek(offset, whence)
gbxparser.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def seek(self, offset):
        """
        We need to override the second param to move from the current position.

        :param offset: offset to move away.
        :type offset: int
        """
        return await self.buffer.seek(offset, io.SEEK_CUR)
binary_io.py 文件源码 项目:io_scene_bfres 作者: Syroot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def align(self, alignment):
        self.reader.seek(-self.reader.tell() % alignment, io.SEEK_CUR)
binary_io.py 文件源码 项目:io_scene_bfres 作者: Syroot 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def align(self, alignment):
        self.writer.seek(-self.writer.tell() % alignment, io.SEEK_CUR)
tarfile.py 文件源码 项目:ivaochdoc 作者: ivaoch 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
lsf.py 文件源码 项目:pyimc 作者: oysstu 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def peek_header(self):
        bytes_read = self.f.readinto(self.header)
        if bytes_read < ctypes.sizeof(IMCHeader):
            raise RuntimeError('LSF file ended abruptly.')

        # Return file position to before header
        self.f.seek(-ctypes.sizeof(IMCHeader), io.SEEK_CUR)
lsf.py 文件源码 项目:pyimc 作者: oysstu 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def write_index(self):
        """
        Run through the lsf-file and generate an index file
        :return:
        """
        self.f.seek(0)

        # Check for file end
        while self.f.peek(1):
            self.peek_header()

            # Timestamp of first message is used to avoid index/lsf mismatch on load
            if self.f.tell() == 0:
                self.idx['timestamp'] = self.header.timestamp

            # Store position for this message
            try:
                self.idx[self.header.mgid].append(self.f.tell())
            except (KeyError, AttributeError) as e:
                self.idx[self.header.mgid] = [self.f.tell()]

            # Go to next message
            self.f.seek(ctypes.sizeof(IMCHeader) + self.header.size + ctypes.sizeof(IMCFooter), io.SEEK_CUR)

        self.f.seek(0)

        # Store index
        fbase, ext = os.path.splitext(self.fpath)
        with open(fbase + '.pyimc_idx', mode='wb') as f:
            pickle.dump(self.idx, f)
lsf.py 文件源码 项目:pyimc 作者: oysstu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def read_message(self):
        """
        Returns a generator that yields the messages in the currently open LSF file.
        This requires the LSFReader object to be opened using the "with" statement.
        See read(), where this is done automatically.
        :return:
        """

        if self.idx:
            # Read using index
            for pos in self.sorted_idx_iter(self.msg_types):
                self.f.seek(pos)
                self.peek_header()
                self.parser.reset()
                b = self.f.read(self.header.size + ctypes.sizeof(IMCHeader) + ctypes.sizeof(IMCFooter))
                msg = self.parser.parse(b)
                yield msg
        else:
            # Read file without index
            # Check for file end
            while self.f.peek(1):
                self.peek_header()

                if not self.msg_types or self.header.mgid in self.msg_types:
                    self.parser.reset()
                    b = self.f.read(self.header.size + ctypes.sizeof(IMCHeader) + ctypes.sizeof(IMCFooter))
                    msg = self.parser.parse(b)
                    yield msg
                else:
                    self.f.seek(ctypes.sizeof(IMCHeader) + self.header.size + ctypes.sizeof(IMCFooter), io.SEEK_CUR)
binary_io.py 文件源码 项目:io_scene_mk8muunt 作者: Syroot 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def align(self, alignment):
        self.reader.seek(-self.reader.tell() % alignment, io.SEEK_CUR)
binary_io.py 文件源码 项目:io_scene_mk8muunt 作者: Syroot 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def align(self, alignment):
        self.writer.seek(-self.writer.tell() % alignment, io.SEEK_CUR)
tarfile.py 文件源码 项目:news-for-good 作者: thecodinghub 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
tarfile.py 文件源码 项目:Tencent_Cartoon_Download 作者: Fretice 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
tarfile.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
GDBCommandExtensions.py 文件源码 项目:PINCE 作者: korcankaraokcu 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def invoke(self, arg, from_tty):
        contents_recv = receive_from_pince()
        hex_byte_list = []
        address = contents_recv[0]
        offset = contents_recv[1]
        with open(ScriptUtils.mem_file, "rb") as FILE:
            FILE.seek(address)
            for item in range(offset):
                try:
                    current_item = " ".join(format(n, '02x') for n in FILE.read(1))
                except IOError:
                    current_item = "??"
                    FILE.seek(1, io.SEEK_CUR)  # Necessary since read() failed to execute
                hex_byte_list.append(current_item)
        send_to_pince(hex_byte_list)
unixfs.py 文件源码 项目:python3-ipfs-api 作者: jgraef 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def seek(self, offset, whence):
        if (whence == io.SEEK_SET):
            self._pos = offset
        elif (whence == io.SEEK_CUR):
            self._pos += offset
        elif (whence == io.SEEK_END):
            self._pos = self._file._filesize + offset
        return self._pos
test_unixfs.py 文件源码 项目:python3-ipfs-api 作者: jgraef 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_small_file_seek_and_read(self):
        with self.fs.open(self.KEY_LOGO_PNG, "rb") as f:
            self.assertEqual(64, f.seek(64, io.SEEK_CUR))
            self.assertEqual(128, f.seek(64, io.SEEK_CUR))
            self.assertEqual(256, f.seek(256, io.SEEK_SET))
            self.assertEqual(24610, f.seek(-256, io.SEEK_END))
            self.assertEqual(
                b'\x04$\x00_\x85$\xfb^\xf8\xe8]\x7f;}\xa8\xb7',
                f.read(16))
tarfile.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
iotools.py 文件源码 项目:pyfilesystem2 作者: PyFilesystem 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def seekable(self):
        try:
            return self._f.seekable()
        except AttributeError:
            try:
                self.seek(0, SEEK_CUR)
            except IOError:
                return False
            else:
                return True
streams.py 文件源码 项目:ivport-v2 作者: ivmech 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def seek(self, offset, whence=io.SEEK_SET):
        """
        Change the stream position to the given byte *offset*. *offset* is
        interpreted relative to the position indicated by *whence*. Values for
        *whence* are:

        * ``SEEK_SET`` or ``0`` – start of the stream (the default); *offset*
          should be zero or positive

        * ``SEEK_CUR`` or ``1`` – current stream position; *offset* may be
          negative

        * ``SEEK_END`` or ``2`` – end of the stream; *offset* is usually
          negative

        Return the new absolute position.
        """
        with self.lock:
            if whence == io.SEEK_CUR:
                offset = self._pos + offset
            elif whence == io.SEEK_END:
                offset = self._length + offset
            if offset < 0:
                raise ValueError(
                    'New position is before the start of the stream')
            self._set_pos(offset)
            return self._pos
tarfile.py 文件源码 项目:CloudPrint 作者: William-An 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def seek(self, position, whence=io.SEEK_SET):
        """Seek to a position in the file.
        """
        if whence == io.SEEK_SET:
            self.position = min(max(position, 0), self.size)
        elif whence == io.SEEK_CUR:
            if position < 0:
                self.position = max(self.position + position, 0)
            else:
                self.position = min(self.position + position, self.size)
        elif whence == io.SEEK_END:
            self.position = max(min(self.size + position, self.size), 0)
        else:
            raise ValueError("Invalid argument")
        return self.position
test_streams.py 文件源码 项目:pyheatshrink 作者: johan-sports 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_seeking_forward_from_current(self):
        contents = TEXT

        with EncodedFile(io.BytesIO(COMPRESSED)) as fp:
            self.assertEqual(fp.read(100), contents[:100])
            fp.seek(50, io.SEEK_CUR)  # Move 50 forwards
            self.assertEqual(fp.read(100), contents[150:250])
test_streams.py 文件源码 项目:pyheatshrink 作者: johan-sports 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_seeking_backwards_from_current(self):
        with EncodedFile(io.BytesIO(COMPRESSED)) as fp:
            contents = fp.read()
            fp.seek(-100, io.SEEK_CUR)
            self.assertEqual(fp.read(), contents[-100:])


问题


面经


文章

微信
公众号

扫码关注公众号