def test_construct(self):
def _check_iterator(it):
self.assertIsInstance(it, abc.Iterator)
self.assertIsInstance(it, abc.Iterable)
s = struct.Struct('>ibcp')
it = s.iter_unpack(b"")
_check_iterator(it)
it = s.iter_unpack(b"1234567")
_check_iterator(it)
# Wrong bytes length
with self.assertRaises(struct.error):
s.iter_unpack(b"123456")
with self.assertRaises(struct.error):
s.iter_unpack(b"12345678")
# Zero-length struct
s = struct.Struct('>')
with self.assertRaises(struct.error):
s.iter_unpack(b"")
with self.assertRaises(struct.error):
s.iter_unpack(b"12")
python类iter_unpack()的实例源码
def test_construct(self):
def _check_iterator(it):
self.assertIsInstance(it, abc.Iterator)
self.assertIsInstance(it, abc.Iterable)
s = struct.Struct('>ibcp')
it = s.iter_unpack(b"")
_check_iterator(it)
it = s.iter_unpack(b"1234567")
_check_iterator(it)
# Wrong bytes length
with self.assertRaises(struct.error):
s.iter_unpack(b"123456")
with self.assertRaises(struct.error):
s.iter_unpack(b"12345678")
# Zero-length struct
s = struct.Struct('>')
with self.assertRaises(struct.error):
s.iter_unpack(b"")
with self.assertRaises(struct.error):
s.iter_unpack(b"12")
def _unpack_bitplanes(self, data):
"""The values that make up a row of pixels are organized like:
[bp1-row1] [bp2-row1] [bp3-row1] [bp4-row1]...
[bp1-row8] [bp2-row8] [bp3-row8] [bp4-row8]
Returns a list of lists containing bitplane values (bytes).
"""
planes = [[], [], [], []]
data_iter = Struct('c').iter_unpack(data)
for bp in data_iter:
planes[0].extend(*bp)
planes[1].extend(*next(data_iter))
planes[2].extend(*next(data_iter))
planes[3].extend(*next(data_iter))
return planes
def toarray(self):
"""Uses 8x8 or 16x16 Tile data to create an array of the tile's data.
Returns an array.
"""
interleaved_tile = self.interleave_subtiles()
tile_fmt = interleaved_tile.dimensions * 'c'
tile_iter = iter_unpack(tile_fmt, interleaved_tile.unpack())
tiles = [subtile for subtile in tile_iter]
return np.array(tiles)
def _interleave(self, subtile1, subtile2):
"""Interleaves two 8x8 tiles like
[subtile1-row1] [subtile2-row1] ...
[subtile1-row16] [subtile2-row16]
Returns bytes()
"""
interleaved = []
interleave_fmt = Struct(4 * 'c')
left_iter = interleave_fmt.iter_unpack(subtile1)
right_iter = interleave_fmt.iter_unpack(subtile2)
for i in left_iter:
right_next = next(right_iter)
interleaved.extend([*i, *right_next])
return interleaved
def _condense(img):
"""PNG image is returned as 3 bands representing RGB on each plane.
Flattens into 1 plane array with each (R,G,B) value represented as RGB.
"""
im = [band.tobytes() for band in img.split()]
band_fmt = 'c'
band_iter = [struct.iter_unpack(band_fmt, split_im) for split_im in im]
comb = []
for val in band_iter[0]:
val_2 = next(band_iter[1])
val_3 = next(band_iter[2])
comb.append(b''.join([*val, *val_2, *val_3]))
return comb
def format_address(data, datatype):
"""Given a answer packet for an A or AAAA query, return the string
representation of the address
"""
if datatype == RR_TYPE_A:
return '.'.join([str(a) for a in data])
elif datatype == RR_TYPE_AAAA:
data = list(struct.iter_unpack("!H", data))
return ":".join(["%x" % a for a in data])
else:
return None
def test_arbitrary_buffer(self):
s = struct.Struct('>IB')
b = bytes(range(1, 11))
it = s.iter_unpack(memoryview(b))
self.assertEqual(next(it), (0x01020304, 5))
self.assertEqual(next(it), (0x06070809, 10))
self.assertRaises(StopIteration, next, it)
self.assertRaises(StopIteration, next, it)
def test_length_hint(self):
lh = operator.length_hint
s = struct.Struct('>IB')
b = bytes(range(1, 16))
it = s.iter_unpack(b)
self.assertEqual(lh(it), 3)
next(it)
self.assertEqual(lh(it), 2)
next(it)
self.assertEqual(lh(it), 1)
next(it)
self.assertEqual(lh(it), 0)
self.assertRaises(StopIteration, next, it)
self.assertEqual(lh(it), 0)
def test_module_func(self):
# Sanity check for the global struct.iter_unpack()
it = struct.iter_unpack('>IB', bytes(range(1, 11)))
self.assertEqual(next(it), (0x01020304, 5))
self.assertEqual(next(it), (0x06070809, 10))
self.assertRaises(StopIteration, next, it)
self.assertRaises(StopIteration, next, it)
def _read_output(self):
fifo = open(self.path, "rb")
while True:
data = fifo.read(2 * self.cavaconfig["general"]["bars"])
sample = [i[0] / 65535 for i in struct.iter_unpack("H", data)]
if sample:
GLib.idle_add(self.data_handler, sample)
else:
break
fifo.close()
GLib.idle_add(self._on_stop)
def test_iterate(self):
s = struct.Struct('>IB')
b = bytes(range(1, 16))
it = s.iter_unpack(b)
self.assertEqual(next(it), (0x01020304, 5))
self.assertEqual(next(it), (0x06070809, 10))
self.assertEqual(next(it), (0x0b0c0d0e, 15))
self.assertRaises(StopIteration, next, it)
self.assertRaises(StopIteration, next, it)
def test_arbitrary_buffer(self):
s = struct.Struct('>IB')
b = bytes(range(1, 11))
it = s.iter_unpack(memoryview(b))
self.assertEqual(next(it), (0x01020304, 5))
self.assertEqual(next(it), (0x06070809, 10))
self.assertRaises(StopIteration, next, it)
self.assertRaises(StopIteration, next, it)
def test_length_hint(self):
lh = operator.length_hint
s = struct.Struct('>IB')
b = bytes(range(1, 16))
it = s.iter_unpack(b)
self.assertEqual(lh(it), 3)
next(it)
self.assertEqual(lh(it), 2)
next(it)
self.assertEqual(lh(it), 1)
next(it)
self.assertEqual(lh(it), 0)
self.assertRaises(StopIteration, next, it)
self.assertEqual(lh(it), 0)
def test_module_func(self):
# Sanity check for the global struct.iter_unpack()
it = struct.iter_unpack('>IB', bytes(range(1, 11)))
self.assertEqual(next(it), (0x01020304, 5))
self.assertEqual(next(it), (0x06070809, 10))
self.assertRaises(StopIteration, next, it)
self.assertRaises(StopIteration, next, it)
def iter_unpack(raw):
"""Yield successive 24-sized chunks from message."""
if OLD:
return chunks(raw)
else:
return struct.iter_unpack(EVENT_FORMAT, raw)
# long, long, unsigned short, unsigned short, int
def _do_iter(self):
if self.read_size:
read_size = EVENT_SIZE * self.read_size
else:
read_size = EVENT_SIZE
data = self._get_data(read_size)
if not data:
return
evdev_objects = iter_unpack(data)
events = [self._make_event(*event) for event in evdev_objects]
return events
# pylint: disable=too-many-arguments
def iter_unpack(raw):
"""Yield successive 24-sized chunks from message."""
if OLD:
return chunks(raw)
else:
return struct.iter_unpack(EVENT_FORMAT, raw)
# long, long, unsigned short, unsigned short, int
def _do_iter(self):
if self.read_size:
read_size = EVENT_SIZE * self.read_size
else:
read_size = EVENT_SIZE
data = self._get_data(read_size)
if not data:
return
evdev_objects = iter_unpack(data)
events = [self._make_event(*event) for event in evdev_objects]
return events
# pylint: disable=too-many-arguments
def unpack(self):
"""Unpacks a 4bpp planar tile.
Returns a byte() of pixel values.
"""
tile_fmt = Struct(32 * 'c')
tile_iter = tile_fmt.iter_unpack(self._tile_data)
tiles = [self._bitplanes_to_tile(b''.join(tile)) for tile in tile_iter]
return b''.join(tiles)
def pack(self, data):
"""Converts pixel values into 4bpp and packs it for Tile use.
Returns bytes()
"""
tile_fmt = Struct(32 * 'c')
tile_iter = tile_fmt.iter_unpack(data)
bitplane_values = [self._tile_to_bitplanes(b''.join(tile)) for tile in tile_iter]
return b''.join(bitplane_values)
def _tile_to_bitplanes(self, data):
bitplanes = []
row_fmt = Struct(8 * 'c')
row_iter = row_fmt.iter_unpack(data)
bitplanes = [self._pixel_row_to_4bpp(row) for row in row_iter]
return b''.join(bitplanes)
def deinterleave_subtiles(self):
tile_fmt = Struct(64 * 'c')
tile_iter = tile_fmt.iter_unpack(self._tile_data)
subtiles = []
for subtile in tile_iter:
subtiles.extend(self._deinterleave(b''.join(subtile)))
deinterleaved = [subtiles[0], subtiles[2], subtiles[1], subtiles[3]]
return Tile(self._tile_addr, b''.join(deinterleaved), self._tile_dimensions)
def _deinterleave(self, data):
deinterleaved = [[], []]
deinterleave_fmt = Struct(4 * 'c')
deinterleave_iter = deinterleave_fmt.iter_unpack(data)
for i in deinterleave_iter:
deinterleaved[0].extend([*i])
deinterleaved[1].extend([*next(deinterleave_iter)])
return [b''.join(data) for data in deinterleaved]
def toarray(self):
"""Unpacks the tiles and fills in pixel color.
Returns an array.
"""
pic_array = []
tiles = self.tiles2d()
for tile_row in tiles:
row = []
for tile in tile_row:
interleaved_tile = tile.interleave_subtiles()
tile_fmt = interleaved_tile.dimensions * 'c'
tile_iter = struct.iter_unpack(tile_fmt, interleaved_tile.unpack())
subtiles = [subtile for subtile in tile_iter]
row.append(self._colortile(subtiles))
pic_array.append(row)
array_rows = []
for row in np.array(pic_array):
array_rows.append(np.concatenate(row, axis=1))
assembled = np.concatenate(array_rows, axis=0)
return assembled
def get_accessor(self, idx):
accessor = self.root['accessors'][idx]
buffer_view = self.get_buffer_view(accessor['bufferView'])
component_type = accessor["componentType"]
type_size = accessor["type"]
if component_type == 5126:
fmt = "<f"
elif component_type == 5123:
fmt = "<H"
elif component_type == 5125:
fmt = "<I"
else:
raise ValueError("Unknown component type: %s" % component_type)
values = [i[0] for i in struct.iter_unpack(fmt, buffer_view)]
result = values
if type_size == "VEC3":
result = [Vector(values[i:i+3]) for i in range(0, len(values), 3)]
elif type_size == "VEC2":
result = [tuple(values[i:i+2]) for i in range(0, len(values), 2)]
return result
def test_iterate(self):
s = struct.Struct('>IB')
b = bytes(range(1, 16))
it = s.iter_unpack(b)
self.assertEqual(next(it), (0x01020304, 5))
self.assertEqual(next(it), (0x06070809, 10))
self.assertEqual(next(it), (0x0b0c0d0e, 15))
self.assertRaises(StopIteration, next, it)
self.assertRaises(StopIteration, next, it)