def recvfrom(self, bufsize, flags=0):
if self.type != socket.SOCK_DGRAM:
return _BaseSocket.recvfrom(self, bufsize, flags)
if not self._proxyconn:
self.bind(("", 0))
buf = BytesIO(_BaseSocket.recv(self, bufsize, flags))
buf.seek(+2, SEEK_CUR)
frag = buf.read(1)
if ord(frag):
raise NotImplementedError("Received UDP packet fragment")
fromhost, fromport = self._read_SOCKS5_address(buf)
if self.proxy_peername:
peerhost, peerport = self.proxy_peername
if fromhost != peerhost or peerport not in (0, fromport):
raise socket.error(EAGAIN, "Packet filtered")
return (buf.read(), (fromhost, fromport))
python类SEEK_CUR的实例源码
def recvfrom(self, bufsize, flags=0):
if self.type != socket.SOCK_DGRAM:
return _BaseSocket.recvfrom(self, bufsize, flags)
if not self._proxyconn:
self.bind(("", 0))
buf = BytesIO(_BaseSocket.recv(self, bufsize, flags))
buf.seek(+2, SEEK_CUR)
frag = buf.read(1)
if ord(frag):
raise NotImplementedError("Received UDP packet fragment")
fromhost, fromport = self._read_SOCKS5_address(buf)
if self.proxy_peername:
peerhost, peerport = self.proxy_peername
if fromhost != peerhost or peerport not in (0, fromport):
raise socket.error(EAGAIN, "Packet filtered")
return (buf.read(), (fromhost, fromport))
def recvfrom(self, bufsize, flags=0):
if self.type != socket.SOCK_DGRAM:
return _BaseSocket.recvfrom(self, bufsize, flags)
if not self._proxyconn:
self.bind(("", 0))
buf = BytesIO(_BaseSocket.recv(self, bufsize, flags))
buf.seek(+2, SEEK_CUR)
frag = buf.read(1)
if ord(frag):
raise NotImplementedError("Received UDP packet fragment")
fromhost, fromport = self._read_SOCKS5_address(buf)
if self.proxy_peername:
peerhost, peerport = self.proxy_peername
if fromhost != peerhost or peerport not in (0, fromport):
raise socket.error(EAGAIN, "Packet filtered")
return (buf.read(), (fromhost, fromport))
def recvfrom(self, bufsize, flags=0):
if self.type != socket.SOCK_DGRAM:
return _BaseSocket.recvfrom(self, bufsize, flags)
if not self._proxyconn:
self.bind(("", 0))
buf = BytesIO(_BaseSocket.recv(self, bufsize, flags))
buf.seek(+2, SEEK_CUR)
frag = buf.read(1)
if ord(frag):
raise NotImplementedError("Received UDP packet fragment")
fromhost, fromport = self._read_SOCKS5_address(buf)
if self.proxy_peername:
peerhost, peerport = self.proxy_peername
if fromhost != peerhost or peerport not in (0, fromport):
raise socket.error(EAGAIN, "Packet filtered")
return (buf.read(), (fromhost, fromport))
def load(self, loader: bfres.core.ResFileLoader):
self.value_type = ShaderParamType(loader.read_byte())
if loader.res_file.version >= 0x03030000:
siz_data = loader.read_byte()
self.data_offset = loader.read_uint16()
offset = loader.read_int32() # Uniform variable offset.
callback_pointer = loader.read_uint32()
self.depended_index = loader.read_uint16()
self.depend_index = loader.read_uint16()
self.name = loader.load_string()
else:
# GUESS
loader.seek(1, io.SEEK_CUR)
self.data_offset = loader.read_uint16()
offset = loader.read_int32() # Uniform variable offset.
self.name = loader.load_string()
def seek(self, offset, whence=io.SEEK_SET):
self.__raiseIfClosed()
if not self.__seekable:
raise OSError("Seek not enabled for this stream")
if whence == io.SEEK_SET:
if offset < 0:
raise ValueError("Cannot have a negative absolute seek")
newStreamPosition = offset
elif whence == io.SEEK_CUR:
newStreamPosition = self.__streamPosition + whence
elif whence == io.SEEK_END:
if not self.__buffers:
newStreamPosition = 0
else:
newStreamPosition = self.__streamEnd + offset
self.__streamPosition = newStreamPosition
def recvfrom(self, bufsize, flags=0):
if self.type != socket.SOCK_DGRAM:
return _BaseSocket.recvfrom(self, bufsize, flags)
if not self._proxyconn:
self.bind(("", 0))
buf = BytesIO(_BaseSocket.recv(self, bufsize, flags))
buf.seek(+2, SEEK_CUR)
frag = buf.read(1)
if ord(frag):
raise NotImplementedError("Received UDP packet fragment")
fromhost, fromport = self._read_SOCKS5_address(buf)
if self.proxy_peername:
peerhost, peerport = self.proxy_peername
if fromhost != peerhost or peerport not in (0, fromport):
raise socket.error(EAGAIN, "Packet filtered")
return (buf.read(), (fromhost, fromport))
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, offset, whence=io.SEEK_CUR):
return self.buffer.seek(offset, whence)
def seek(self, offset):
"""
We need to override the second param to move from the current position.
:param offset: offset to move away.
:type offset: int
"""
return await self.buffer.seek(offset, io.SEEK_CUR)
def align(self, alignment):
self.reader.seek(-self.reader.tell() % alignment, io.SEEK_CUR)
def align(self, alignment):
self.writer.seek(-self.writer.tell() % alignment, io.SEEK_CUR)
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def peek_header(self):
bytes_read = self.f.readinto(self.header)
if bytes_read < ctypes.sizeof(IMCHeader):
raise RuntimeError('LSF file ended abruptly.')
# Return file position to before header
self.f.seek(-ctypes.sizeof(IMCHeader), io.SEEK_CUR)
def write_index(self):
"""
Run through the lsf-file and generate an index file
:return:
"""
self.f.seek(0)
# Check for file end
while self.f.peek(1):
self.peek_header()
# Timestamp of first message is used to avoid index/lsf mismatch on load
if self.f.tell() == 0:
self.idx['timestamp'] = self.header.timestamp
# Store position for this message
try:
self.idx[self.header.mgid].append(self.f.tell())
except (KeyError, AttributeError) as e:
self.idx[self.header.mgid] = [self.f.tell()]
# Go to next message
self.f.seek(ctypes.sizeof(IMCHeader) + self.header.size + ctypes.sizeof(IMCFooter), io.SEEK_CUR)
self.f.seek(0)
# Store index
fbase, ext = os.path.splitext(self.fpath)
with open(fbase + '.pyimc_idx', mode='wb') as f:
pickle.dump(self.idx, f)
def read_message(self):
"""
Returns a generator that yields the messages in the currently open LSF file.
This requires the LSFReader object to be opened using the "with" statement.
See read(), where this is done automatically.
:return:
"""
if self.idx:
# Read using index
for pos in self.sorted_idx_iter(self.msg_types):
self.f.seek(pos)
self.peek_header()
self.parser.reset()
b = self.f.read(self.header.size + ctypes.sizeof(IMCHeader) + ctypes.sizeof(IMCFooter))
msg = self.parser.parse(b)
yield msg
else:
# Read file without index
# Check for file end
while self.f.peek(1):
self.peek_header()
if not self.msg_types or self.header.mgid in self.msg_types:
self.parser.reset()
b = self.f.read(self.header.size + ctypes.sizeof(IMCHeader) + ctypes.sizeof(IMCFooter))
msg = self.parser.parse(b)
yield msg
else:
self.f.seek(ctypes.sizeof(IMCHeader) + self.header.size + ctypes.sizeof(IMCFooter), io.SEEK_CUR)
def align(self, alignment):
self.reader.seek(-self.reader.tell() % alignment, io.SEEK_CUR)
def align(self, alignment):
self.writer.seek(-self.writer.tell() % alignment, io.SEEK_CUR)
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def invoke(self, arg, from_tty):
contents_recv = receive_from_pince()
hex_byte_list = []
address = contents_recv[0]
offset = contents_recv[1]
with open(ScriptUtils.mem_file, "rb") as FILE:
FILE.seek(address)
for item in range(offset):
try:
current_item = " ".join(format(n, '02x') for n in FILE.read(1))
except IOError:
current_item = "??"
FILE.seek(1, io.SEEK_CUR) # Necessary since read() failed to execute
hex_byte_list.append(current_item)
send_to_pince(hex_byte_list)
def seek(self, offset, whence):
if (whence == io.SEEK_SET):
self._pos = offset
elif (whence == io.SEEK_CUR):
self._pos += offset
elif (whence == io.SEEK_END):
self._pos = self._file._filesize + offset
return self._pos
def test_small_file_seek_and_read(self):
with self.fs.open(self.KEY_LOGO_PNG, "rb") as f:
self.assertEqual(64, f.seek(64, io.SEEK_CUR))
self.assertEqual(128, f.seek(64, io.SEEK_CUR))
self.assertEqual(256, f.seek(256, io.SEEK_SET))
self.assertEqual(24610, f.seek(-256, io.SEEK_END))
self.assertEqual(
b'\x04$\x00_\x85$\xfb^\xf8\xe8]\x7f;}\xa8\xb7',
f.read(16))
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def seekable(self):
try:
return self._f.seekable()
except AttributeError:
try:
self.seek(0, SEEK_CUR)
except IOError:
return False
else:
return True
def seek(self, offset, whence=io.SEEK_SET):
"""
Change the stream position to the given byte *offset*. *offset* is
interpreted relative to the position indicated by *whence*. Values for
*whence* are:
* ``SEEK_SET`` or ``0`` – start of the stream (the default); *offset*
should be zero or positive
* ``SEEK_CUR`` or ``1`` – current stream position; *offset* may be
negative
* ``SEEK_END`` or ``2`` – end of the stream; *offset* is usually
negative
Return the new absolute position.
"""
with self.lock:
if whence == io.SEEK_CUR:
offset = self._pos + offset
elif whence == io.SEEK_END:
offset = self._length + offset
if offset < 0:
raise ValueError(
'New position is before the start of the stream')
self._set_pos(offset)
return self._pos
def seek(self, position, whence=io.SEEK_SET):
"""Seek to a position in the file.
"""
if whence == io.SEEK_SET:
self.position = min(max(position, 0), self.size)
elif whence == io.SEEK_CUR:
if position < 0:
self.position = max(self.position + position, 0)
else:
self.position = min(self.position + position, self.size)
elif whence == io.SEEK_END:
self.position = max(min(self.size + position, self.size), 0)
else:
raise ValueError("Invalid argument")
return self.position
def test_seeking_forward_from_current(self):
contents = TEXT
with EncodedFile(io.BytesIO(COMPRESSED)) as fp:
self.assertEqual(fp.read(100), contents[:100])
fp.seek(50, io.SEEK_CUR) # Move 50 forwards
self.assertEqual(fp.read(100), contents[150:250])
def test_seeking_backwards_from_current(self):
with EncodedFile(io.BytesIO(COMPRESSED)) as fp:
contents = fp.read()
fp.seek(-100, io.SEEK_CUR)
self.assertEqual(fp.read(), contents[-100:])