def setUp_with_dest_unreach(self):
self.unreach_mtu = 10
self.unreach_data = b'abc'
self.unreach_data_len = len(self.unreach_data)
self.data = icmp.dest_unreach(
data_len=self.unreach_data_len, mtu=self.unreach_mtu,
data=self.unreach_data)
self.type_ = icmp.ICMP_DEST_UNREACH
self.code = icmp.ICMP_HOST_UNREACH_CODE
self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)
self.buf = bytearray(struct.pack(
icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
self.buf += self.data.serialize()
self.csum_calc = packet_utils.checksum(self.buf)
struct.pack_into('!H', self.buf, 2, self.csum_calc)
python类pack_into()的实例源码
def yw_proc(data, isEncrypt, key=None, head=None, validator=None):
out = bytearray()
length = len(data)
new_crc32 = struct.unpack("<I", data[-8:-4])[0]
seed = struct.unpack("<I", data[-4:])[0]
if not isEncrypt:
if binascii.crc32(data[:-8]) != new_crc32:
logging.error("Checksum does not match")
return None
c = YWCipher(seed, 0x1000)
out += c.encrypt(data[:-8])
out += data[-8:]
if isEncrypt:
new_crc32 = binascii.crc32(out[:-8])
struct.pack_into("<I", out, length - 8, new_crc32)
return bytes(out)
def allThornyan(file, code=72463062):
print("=== Yo-kai ===")
data = open(file, "rb").read()
data = bytearray(data)
pos = 0x1D08
for i in range(240):
params = struct.unpack("<4x i 56x 5b 5B 5b 5x B 3x 2x H", data[pos:pos+0x5C])
if params[0] == 0:
continue
print("{} -> {}".format(
youkai.get(params[0], "(unknown)"),
youkai.get(code, "(unknown)"),
)
)
struct.pack_into("<i", data, pos + 4, code)
pos += 0x5C
with open(file + ".edited.yw", "xb") as f:
f.write(data)
def save(self):
new = bytearray(self.original)
new[0x4:0x10] = self.gamedata_header
struct.pack_into("<L", new, 0x00024, self.time_played)
struct.pack_into("<L", new, 0x0002C, self.last_saved_chapter)
namebuf = self.name.encode("utf-16-le")
# limit length
if len(namebuf) > 68:
namebuf = namebuf[:68]
# pad with zeroes
namebuf = namebuf + b"\0" * (70 - len(namebuf))
new[0x34:0x7A] = namebuf
struct.pack_into("<L", new, 0x3056C, self.money)
struct.pack_into("<L", new, 0x3871C, self.experience)
# back up the file
shutil.move(self.path, self.path + ".bak")
# write the new one
with open(self.path, "wb") as f:
f.write(new)
def transactionI2C(self, port, device_address, data_to_send, send_size, data_received, receive_size):
'''
To give data back use ``data_received``::
data_received[:] = [1,2,3...]
:returns: number of bytes returned
'''
if data_to_send[0] == BNO055.BNO055_EULER_H_LSB_ADDR:
struct.pack_into('<h', data_received, 0, int(self.heading * 900.0))
if data_to_send[0] == BNO055.BNO055_EULER_P_LSB_ADDR:
struct.pack_into('<h', data_received, 0, int(self.pitch * 900.0))
if data_to_send[0] == BNO055.BNO055_EULER_R_LSB_ADDR:
struct.pack_into('<h', data_received, 0, int(self.roll * 900.0))
return receive_size
def test_pack_into_fn(self):
test_string = 'Reykjavik rocks, eow!'
writable_buf = array.array('c', ' '*100)
fmt = '21s'
pack_into = lambda *args: struct.pack_into(fmt, *args)
# Test without offset.
pack_into(writable_buf, 0, test_string)
from_buf = writable_buf.tostring()[:len(test_string)]
self.assertEqual(from_buf, test_string)
# Test with offset.
pack_into(writable_buf, 10, test_string)
from_buf = writable_buf.tostring()[:len(test_string)+10]
self.assertEqual(from_buf, test_string[:10] + test_string)
# Go beyond boundaries.
small_buf = array.array('c', ' '*10)
self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
test_string)
self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
test_string)
def test_pack_into_fn(self):
test_string = b'Reykjavik rocks, eow!'
writable_buf = array.array('b', b' '*100)
fmt = '21s'
pack_into = lambda *args: struct.pack_into(fmt, *args)
# Test without offset.
pack_into(writable_buf, 0, test_string)
from_buf = writable_buf.tobytes()[:len(test_string)]
self.assertEqual(from_buf, test_string)
# Test with offset.
pack_into(writable_buf, 10, test_string)
from_buf = writable_buf.tobytes()[:len(test_string)+10]
self.assertEqual(from_buf, test_string[:10] + test_string)
# Go beyond boundaries.
small_buf = array.array('b', b' '*10)
self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
test_string)
self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
test_string)
def test_trailing_counter(self):
store = array.array('b', b' '*100)
# format lists containing only count spec should result in an error
self.assertRaises(struct.error, struct.pack, '12345')
self.assertRaises(struct.error, struct.unpack, '12345', '')
self.assertRaises(struct.error, struct.pack_into, '12345', store, 0)
self.assertRaises(struct.error, struct.unpack_from, '12345', store, 0)
# Format lists with trailing count spec should result in an error
self.assertRaises(struct.error, struct.pack, 'c12345', 'x')
self.assertRaises(struct.error, struct.unpack, 'c12345', 'x')
self.assertRaises(struct.error, struct.pack_into, 'c12345', store, 0,
'x')
self.assertRaises(struct.error, struct.unpack_from, 'c12345', store,
0)
# Mixed format tests
self.assertRaises(struct.error, struct.pack, '14s42', 'spam and eggs')
self.assertRaises(struct.error, struct.unpack, '14s42',
'spam and eggs')
self.assertRaises(struct.error, struct.pack_into, '14s42', store, 0,
'spam and eggs')
self.assertRaises(struct.error, struct.unpack_from, '14s42', store, 0)
def test_pack_into_fn(self):
test_string = 'Reykjavik rocks, eow!'
writable_buf = array.array('c', ' '*100)
fmt = '21s'
pack_into = lambda *args: struct.pack_into(fmt, *args)
# Test without offset.
pack_into(writable_buf, 0, test_string)
from_buf = writable_buf.tostring()[:len(test_string)]
self.assertEqual(from_buf, test_string)
# Test with offset.
pack_into(writable_buf, 10, test_string)
from_buf = writable_buf.tostring()[:len(test_string)+10]
self.assertEqual(from_buf, test_string[:10] + test_string)
# Go beyond boundaries.
small_buf = array.array('c', ' '*10)
self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
test_string)
self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
test_string)
def _pack_array(values, definition, fmt_string, num_bytes):
"""
Pack an array item defined by a dictionary entry into a bytearray
Args:
values (tuple): 1d or 2d tuple of values to pack
definition (dict): definition of the data item
fmt_string (str): struct.unpack format string for an element
num_bytes (int): number of bytes of each element
"""
packet_part = bytearray()
if definition['DIM_2_SIZE']:
for dim1 in values:
for value in dim1:
buff = bytearray(num_bytes)
struct.pack_into(fmt_string, buff, 0, value)
packet_part += buff
else:
for value in values:
buff = bytearray(num_bytes)
struct.pack_into(fmt_string, buff, 0, value)
packet_part += buff
return packet_part
def mai_set_time(self, gpstime):
"""
Sets the ADACS clock.
Arguments:
gpstime - GPS time is a linear count of seconds elapsed since 0h Jan 6, 1980.
"""
DATA_LEN = 40 #always
data = bytearray(DATA_LEN)
syncbyte = 0xEB90 # TODO: the Pumpkin comments contain some confusing statements about endianness
commandID = 0x44 # set GPS time
# Create data
struct.pack_into('<HBL', data, 0, syncbyte, commandID,
gpstime)
# Add data checksum. (This is different than the packet checksum.)
checksum = 0xFFFF & sum(data[0:DATA_LEN])
struct.pack_into('<H', data, 38, checksum)
Send.send_bus_cmd(BusCommands.MAI_CMD, data)
def _transmitMsg(self, msg):
"""Send an OSC message over a streaming socket. Raises exception if it
should fail. If everything is transmitted properly, True is returned. If
socket has been closed, False.
"""
if not isinstance(msg, OSCMessage):
raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")
try:
binary = msg.getBinary()
length = len(binary)
# prepend length of packet before the actual message (big endian)
len_big_endian = array.array('c', '\0' * 4)
struct.pack_into(">L", len_big_endian, 0, length)
len_big_endian = len_big_endian.tostring()
if self._transmit(len_big_endian) and self._transmit(binary):
return True
return False
except socket.error, e:
if e[0] == errno.EPIPE: # broken pipe
return False
raise e
def assemble_int_data(self, int_data, global_offset):
offset = None
data_section = bytes()
int_buffer = ctypes.create_string_buffer(4)
for var in int_data:
length = 0
t = var['val']
if t is not None:
offset = global_offset
for v in t:
struct.pack_into('>i', int_buffer, 0, v)
data_section += int_buffer
global_offset += 32
length += 32
struct.pack_into('>I', int_buffer, 0, 0xffffffff)
data_section += int_buffer
global_offset += 32
length += 32
else:
offset = None
length = 32
self.add_var(SGSConfig.VT_INT, '@'+var['name'], offset, length)
return (data_section, global_offset)
def assemble_int_data(self, int_data, global_offset):
offset = None
data_section = bytes()
int_buffer = ctypes.create_string_buffer(4)
for var in int_data:
length = 0
t = var['val']
if t is not None:
offset = global_offset
for v in t:
struct.pack_into('>i', int_buffer, 0, v)
data_section += int_buffer
global_offset += 32
length += 32
struct.pack_into('>I', int_buffer, 0, 0xffffffff)
data_section += int_buffer
global_offset += 32
length += 32
else:
offset = None
length = 32
self.add_var(SGSConfig.VT_INT, '@'+var['name'], offset, length)
return (data_section, global_offset)
def test_pack_into_fn(self):
test_string = b'Reykjavik rocks, eow!'
writable_buf = array.array('b', b' '*100)
fmt = '21s'
pack_into = lambda *args: struct.pack_into(fmt, *args)
# Test without offset.
pack_into(writable_buf, 0, test_string)
from_buf = writable_buf.tobytes()[:len(test_string)]
self.assertEqual(from_buf, test_string)
# Test with offset.
pack_into(writable_buf, 10, test_string)
from_buf = writable_buf.tobytes()[:len(test_string)+10]
self.assertEqual(from_buf, test_string[:10] + test_string)
# Go beyond boundaries.
small_buf = array.array('b', b' '*10)
self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
test_string)
self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
test_string)
def test_trailing_counter(self):
store = array.array('b', b' '*100)
# format lists containing only count spec should result in an error
self.assertRaises(struct.error, struct.pack, '12345')
self.assertRaises(struct.error, struct.unpack, '12345', '')
self.assertRaises(struct.error, struct.pack_into, '12345', store, 0)
self.assertRaises(struct.error, struct.unpack_from, '12345', store, 0)
# Format lists with trailing count spec should result in an error
self.assertRaises(struct.error, struct.pack, 'c12345', 'x')
self.assertRaises(struct.error, struct.unpack, 'c12345', 'x')
self.assertRaises(struct.error, struct.pack_into, 'c12345', store, 0,
'x')
self.assertRaises(struct.error, struct.unpack_from, 'c12345', store,
0)
# Mixed format tests
self.assertRaises(struct.error, struct.pack, '14s42', 'spam and eggs')
self.assertRaises(struct.error, struct.unpack, '14s42',
'spam and eggs')
self.assertRaises(struct.error, struct.pack_into, '14s42', store, 0,
'spam and eggs')
self.assertRaises(struct.error, struct.unpack_from, '14s42', store, 0)
def _transmitMsg(self, msg):
"""Send an OSC message over a streaming socket. Raises exception if it
should fail. If everything is transmitted properly, True is returned. If
socket has been closed, False.
"""
if not isinstance(msg, OSCMessage):
raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")
try:
binary = msg.getBinary()
length = len(binary)
# prepend length of packet before the actual message (big endian)
len_big_endian = array.array('c', '\0' * 4)
struct.pack_into(">L", len_big_endian, 0, length)
len_big_endian = len_big_endian.tostring()
if self._transmit(len_big_endian) and self._transmit(binary):
return True
return False
except socket.error as e:
if e[0] == errno.EPIPE: # broken pipe
return False
raise e
def serialize(self):
buf = bytearray(struct.pack(self._PACK_STR, self.type_,
self.aux_len, self.num,
addrconv.ipv4.text_to_bin(self.address)))
for src in self.srcs:
buf.extend(struct.pack('4s', addrconv.ipv4.text_to_bin(src)))
if 0 == self.num:
self.num = len(self.srcs)
struct.pack_into('!H', buf, 2, self.num)
if self.aux is not None:
mod = len(self.aux) % 4
if mod:
self.aux += bytearray(4 - mod)
self.aux = six.binary_type(self.aux)
buf.extend(self.aux)
if 0 == self.aux_len:
self.aux_len = len(self.aux) // 4
struct.pack_into('!B', buf, 1, self.aux_len)
return six.binary_type(buf)
def serialize(self, payload, prev):
length = len(self)
hdr = bytearray(length)
version = self.version << 4 | self.header_length
flags = self.flags << 13 | self.offset
if self.total_length == 0:
self.total_length = self.header_length * 4 + len(payload)
struct.pack_into(ipv4._PACK_STR, hdr, 0, version, self.tos,
self.total_length, self.identification, flags,
self.ttl, self.proto, 0,
addrconv.ipv4.text_to_bin(self.src),
addrconv.ipv4.text_to_bin(self.dst))
if self.option:
assert (length - ipv4._MIN_LEN) >= len(self.option)
hdr[ipv4._MIN_LEN:ipv4._MIN_LEN + len(self.option)] = self.option
self.csum = packet_utils.checksum(hdr)
struct.pack_into('!H', hdr, 10, self.csum)
return hdr
def serialize(self):
buf = bytearray(struct.pack(self._PACK_STR, self.type_,
self.aux_len, self.num,
addrconv.ipv6.text_to_bin(self.address)))
for src in self.srcs:
buf.extend(struct.pack('16s', addrconv.ipv6.text_to_bin(src)))
if 0 == self.num:
self.num = len(self.srcs)
struct.pack_into('!H', buf, 2, self.num)
if self.aux is not None:
mod = len(self.aux) % 4
if mod:
self.aux += bytearray(4 - mod)
self.aux = six.binary_type(self.aux)
buf.extend(self.aux)
if 0 == self.aux_len:
self.aux_len = len(self.aux) // 4
struct.pack_into('!B', buf, 1, self.aux_len)
return six.binary_type(buf)