def write_value(fd, fmt, *value, **kwargs):
"""
Write a single binary value to a file-like object.
Parameters
----------
fd : file-like object
Must be opened for writing, in binary mode.
fmt : str
A `struct` module `format character
<https://docs.python.org/2/library/struct.html#format-characters>`__
string.
value : any
The value to encode and write to the file.
endian : str
The endianness. Must be ``>`` or ``<``. Default: ``>``.
"""
endian = kwargs.get('endian', '>')
fmt = endian + fmt
fd.write(struct.pack(fmt, *value))
python类html()的实例源码
def __init__(self, file_path, mode, data_fmt, convert_type=None):
"""
Args:
file_path
mode: 'w', 'r', or 'a'. Automatically append 'b' to the mode.
data_fmt: see https://docs.python.org/2/library/struct.html
i - int; I - unsigned int; q - long long; Q - unsigned long long
f - float; d - double; s - string; c - char; ? - bool
b - signed char; B - unsigned char; h - short; H - unsigned short.
3i - tuple of 3 ints; 'ifb' - tuple of int, float, bool
convert_type: if you write int, file.read() will return a tuple (3,)
use convert_return to return convert_return(*read) instead.
"""
self.data_fmt = data_fmt
self._size = struct.calcsize(data_fmt)
self.convert_type = convert_type
mode = self._get_mode(mode)
AbstractFile.__init__(self, file_path, mode)
def split_field(byte_len, data_bytes):
'''
Return a tuple containing the first byte_len bytes in data_bytes, and the
remainder of data_bytes.
Asserts if data_bytes is not at least byte_len bytes long.
'''
assert len(data_bytes) >= byte_len
return (bytearray(data_bytes[0:byte_len]),
bytearray(data_bytes[byte_len:]))
# struct formats. See
# https://docs.python.org/2/library/struct.html#byte-order-size-and-alignment
def endianness(self):
"""
'<' little-endian
'!' Network-byte-order (big-endian)
https://docs.python.org/3.5/library/struct.html#format-strings
From the RFC:
The NETWORK_BYTE_ORDER bit applies to all multi-byte integer
values in the entire AgentX packet, including the remaining
header fields. If set, then network byte order (most
significant byte first; "big endian") is used. If not set,
then least significant byte first ("little endian") is used.
"""
return '!' if self.flag__network_byte_order else '<'
def read_value(fd, fmt, endian='>'):
# type: (BinaryIO, unicode, unicode) -> Any
"""
Read a values from a file-like object.
Parameters
----------
fd : file-like object
Must be opened for reading, in binary mode.
fmt : str
A `struct` module `format character
<https://docs.python.org/2/library/struct.html#format-characters>`__
string.
endian : str
The endianness. Must be ``>`` or ``<``. Default: ``>``.
Returns
-------
value : any
The value(s) read from the file.
If a single value, it is returned alone. If multiple values,
a tuple is returned.
"""
fmt = endian + fmt
size = struct.calcsize(fmt) # type: ignore
result = struct.unpack(fmt, fd.read(size)) # type: ignore
if len(result) == 1:
return result[0]
else:
return result
def unpack(self, value):
"""Unpack a value using Python's struct.unpack()"""
assert(self.pack_format is not None)
# Note: "The result is a tuple even if it contains exactly one item."
# (https://docs.python.org/2/library/struct.html#struct.unpack)
# For single-valued data types, use AdsSingleValuedDatatype to get the
# first (and only) entry of the tuple after unpacking.
return struct.unpack(self.pack_format, value)
def _start(self):
# memoryview act as an recv buffer
# refer https://docs.python.org/3/library/stdtypes.html#memoryview
buff = memoryview(bytearray(RECV_BUFFER_SIZE))
while True:
if not self.conn_rd:
# sleep if there is no connections
time.sleep(0.06)
continue
# blocks until there is socket(s) ready for .recv
# notice: sockets which were closed by remote,
# are also regarded as read-ready by select()
r, w, e = select.select(self.conn_rd, [], [], 0.5)
for s in r: # iter every read-ready or closed sockets
try:
# here, we use .recv_into() instead of .recv()
# recv data directly into the pre-allocated buffer
# to avoid many unnecessary malloc()
# see https://docs.python.org/3/library/socket.html#socket.socket.recv_into
rec_len = s.recv_into(buff, RECV_BUFFER_SIZE)
except:
# unable to read, in most cases, it's due to socket close
self._rd_shutdown(s)
continue
if not rec_len:
# read zero size, closed or shutdowned socket
self._rd_shutdown(s)
continue
try:
# send data, we use `buff[:rec_len]` slice because
# only the front of buff is filled
self.map[s].send(buff[:rec_len])
except:
# unable to send, close connection
self._rd_shutdown(s)
continue
def decode_string(data):
""" Decode string and strip NULL-bytes from end."""
return data.decode('utf-8').rstrip('\0')
# IRIS Data Types and corresponding python struct format characters
# 4.2 Scalar Definitions, Page 23
# https://docs.python.org/3/library/struct.html#format-characters
def read_uleb128(data):
# the first bit of each byte is 1, unless that's the last byte
total = 0
found = False
# so technically it doesn't have to be 5...
if len(data) != 5:
log(3, "read_uleb128, where len(data) == %i" % len(data))
#assert len(data) == 5
for i in xrange(5):
value = ord(data[i])
high_bit = (ord(data[i]) >> 7)
# clear the high bit
total += (value & 0x7f) << (i * 7) | total
# this is the last byte, so break
if high_bit == 0:
found = True
break
if not found: # redundant to also check for "i == 4"?
log(3, "invalid ULEB128")
assert False
# return (value, num_of_bytes) # where num_of_bytes indicates how much space this LEB128 took up
return total, i+1
# http://llvm.org/docs/doxygen/html/LEB128_8h_source.html
# hex => decimal
###############3
# 00 => 0
# 01 => 1
# 7f => -1
# 80 7f => -128
def class_defs(self): # seems ok
class_defs_size_offset = 96 # VERIFIED
class_defs_off_offset = 100 # VERIFIED
self.class_defs_size = self.read_uint(class_defs_size_offset) # ok
self.class_defs_off = self.read_uint(class_defs_off_offset) # ok
print "\n===============================\n"
print "class_defs_size: ", self.class_defs_size, "\n"
print "class_defs_off: ", hex(self.class_defs_off), "\n"
# class_def_items will store the class_def_items, see "class_def_item" @ https://source.android.com/devices/tech/dalvik/dex-format.html
# Name | Format
# ========================================
# class_idx uint | uint
# access_flags | uint
# superclass_idx | uint
# interfaces_off | uint
# source_file_idx | uint
# annotations_off | uint
# class_data_off | uint
# static_values_off | uint
class_def_item_size = 0x20 # 0x20 is 32 decimal, the class_def_item size in bytes
class_def_items = []
offset = 0
for i in range(self.class_defs_size):
offset = four_byte_align(offset)
item = self.read_class_def_item(offset)
offset += class_def_item_size
class_def_items.append(item)
# list of class_def_item objects
return class_def_items
# collision?
# handles data_size, data_off
def _start(self):
# memoryview act as an recv buffer
# refer https://docs.python.org/3/library/stdtypes.html#memoryview
buff = memoryview(bytearray(RECV_BUFFER_SIZE))
while True:
if not self.conn_rd:
# sleep if there is no connections
time.sleep(0.06)
continue
# blocks until there is socket(s) ready for .recv
# notice: sockets which were closed by remote,
# are also regarded as read-ready by select()
r, w, e = select.select(self.conn_rd, [], [], 0.5)
for s in r: # iter every read-ready or closed sockets
try:
# here, we use .recv_into() instead of .recv()
# recv data directly into the pre-allocated buffer
# to avoid many unnecessary malloc()
# see https://docs.python.org/3/library/socket.html#socket.socket.recv_into
rec_len = s.recv_into(buff, RECV_BUFFER_SIZE)
except:
# unable to read, in most cases, it's due to socket close
self._rd_shutdown(s)
continue
if not rec_len:
# read zero size, closed or shutdowned socket
self._rd_shutdown(s)
continue
try:
# send data, we use `buff[:rec_len]` slice because
# only the front of buff is filled
self.map[s].send(buff[:rec_len])
except:
# unable to send, close connection
self._rd_shutdown(s)
continue
def _create(self, message_h, **fields):
'''
Render the supplied message values to the bytearray.
'''
#
# ensure that the fields match the schema
if message_h not in self.gruel_protocol:
raise Exception("No message exists matching [%s]"%message_h)
message_stencil = self.gruel_protocol.get_message_stencil(
message_h=message_h)
set_sch = set(message_stencil.field_names())
set_got = set(fields.keys())
if set_sch != set_got:
raise Exception("Inconsistent fields/want:%s/got:%s"%(
str(set_sch), str(set_got)))
#
# render
offset = 0
for (field_h, field_dt) in message_stencil.items():
field_value = fields[field_h]
# struct docs: https://docs.python.org/3.1/library/struct.html
#log('** o%s press %s %s'%(offset, field_h, field_value))
if field_dt.name == 'u1':
bsize = 1
struct.pack_into(
'!B', # fmt. B is unsigned char
self.arr, # buffer
offset, # offset
field_value)
offset += bsize
elif field_dt.name == 'u2':
bsize = 2
struct.pack_into(
'!H', # fmt. H is unsigned short
self.arr, # buffer
offset, # offset
field_value)
offset += bsize
elif field_dt.name == 'vs':
s_len = len(field_value)
# first we do a two-byte length, network-endian
bsize = 2
struct.pack_into(
'!H', # fmt. H is unsigned short
self.arr, # buffer
offset, # offset
s_len)
offset += bsize
# Now we put that string into the array. Emphasis: these
# strings are not zero-terminated.
struct.pack_into(
'%ss'%s_len, # fmt.
self.arr, # buffer
offset, # offset
bytes(field_value, 'utf8'))
offset += s_len
else:
raise Exception("Datatype not recognised/handled: %s"%(
field_dt.name))
return self.arr
def parse_idx(fd):
"""Parse an IDX file, and return it as a numpy array.
Parameters
----------
fd : file
File descriptor of the IDX file to parse
endian : str
Byte order of the IDX file. See [1] for available options
Returns
-------
data : numpy.ndarray
Numpy array with the dimensions and the data in the IDX file
1. https://docs.python.org/3/library/struct.html#byte-order-size-and-alignment
"""
DATA_TYPES = {0x08: 'B', # unsigned byte
0x09: 'b', # signed byte
0x0b: 'h', # short (2 bytes)
0x0c: 'i', # int (4 bytes)
0x0d: 'f', # float (4 bytes)
0x0e: 'd'} # double (8 bytes)
header = fd.read(4)
if len(header) != 4:
raise IdxDecodeError('Invalid IDX file, file empty or does not contain a full header.')
zeros, data_type, num_dimensions = struct.unpack('>HBB', header)
if zeros != 0:
raise IdxDecodeError('Invalid IDX file, file must start with two zero bytes. '
'Found 0x%02x' % zeros)
try:
data_type = DATA_TYPES[data_type]
except KeyError:
raise IdxDecodeError('Unknown data type 0x%02x in IDX file' % data_type)
dimension_sizes = struct.unpack('>' + 'I' * num_dimensions,
fd.read(4 * num_dimensions))
data = array.array(data_type, fd.read())
data.byteswap() # looks like array.array reads data as little endian
expected_items = functools.reduce(operator.mul, dimension_sizes)
if len(data) != expected_items:
raise IdxDecodeError('IDX file has wrong number of items. '
'Expected: %d. Found: %d' % (expected_items, len(data)))
return np.array(data).reshape(dimension_sizes)
def _start(self):
# memoryview act as an recv buffer
# refer https://docs.python.org/3/library/stdtypes.html#memoryview
buff = memoryview(bytearray(RECV_BUFFER_SIZE))
while True:
if not self.conn_rd:
# sleep if there is no connections
time.sleep(0.06)
continue
# blocks until there is socket(s) ready for .recv
# notice: sockets which were closed by remote,
# are also regarded as read-ready by select()
r, w, e = select.select(self.conn_rd, [], [], 0.5)
for s in r: # iter every read-ready or closed sockets
try:
# here, we use .recv_into() instead of .recv()
# recv data directly into the pre-allocated buffer
# to avoid many unnecessary malloc()
# see https://docs.python.org/3/library/socket.html#socket.socket.recv_into
rec_len = s.recv_into(buff, RECV_BUFFER_SIZE)
# agre = "http"
# url = agre + '://' + heads['Host']
# heads = httphead(buff.tobytes().decode('utf-8'))
# logging.info("recv head:{}".format(heads))
except Exception as e:
# unable to read, in most cases, it's due to socket close
self._rd_shutdown(s)
continue
if not rec_len:
# read zero size, closed or shutdowned socket
self._rd_shutdown(s)
continue
try:
# send data, we use `buff[:rec_len]` slice because
# only the front of buff is filled
self.map[s].send(buff[:rec_len])
except Exception as e:
# unable to send, close connection
self._rd_shutdown(s)
continue