python类pack_into()的实例源码

test_icmp.py 文件源码 项目:ryu-lagopus-ext 作者: lagopus 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def setUp_with_dest_unreach(self):
        self.unreach_mtu = 10
        self.unreach_data = b'abc'
        self.unreach_data_len = len(self.unreach_data)
        self.data = icmp.dest_unreach(
            data_len=self.unreach_data_len, mtu=self.unreach_mtu,
            data=self.unreach_data)

        self.type_ = icmp.ICMP_DEST_UNREACH
        self.code = icmp.ICMP_HOST_UNREACH_CODE
        self.ic = icmp.icmp(self.type_, self.code, self.csum, self.data)

        self.buf = bytearray(struct.pack(
            icmp.icmp._PACK_STR, self.type_, self.code, self.csum))
        self.buf += self.data.serialize()
        self.csum_calc = packet_utils.checksum(self.buf)
        struct.pack_into('!H', self.buf, 2, self.csum_calc)
yw_save.py 文件源码 项目:yw_save 作者: togenyan 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def yw_proc(data, isEncrypt, key=None, head=None, validator=None):
    out = bytearray()
    length = len(data)
    new_crc32 = struct.unpack("<I", data[-8:-4])[0]
    seed = struct.unpack("<I", data[-4:])[0]
    if not isEncrypt:
        if binascii.crc32(data[:-8]) != new_crc32:
            logging.error("Checksum does not match")
            return None
    c = YWCipher(seed, 0x1000)
    out += c.encrypt(data[:-8])
    out += data[-8:]
    if isEncrypt:
        new_crc32 = binascii.crc32(out[:-8])
        struct.pack_into("<I", out, length - 8, new_crc32)
    return bytes(out)
dump.py 文件源码 项目:yw_save 作者: togenyan 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def allThornyan(file, code=72463062):
    print("=== Yo-kai ===")
    data = open(file, "rb").read()
    data = bytearray(data)
    pos = 0x1D08
    for i in range(240):
        params = struct.unpack("<4x i 56x 5b 5B 5b 5x B 3x 2x H", data[pos:pos+0x5C])
        if params[0] == 0:
            continue
        print("{} -> {}".format(
                youkai.get(params[0], "(unknown)"),
                youkai.get(code, "(unknown)"),
            )
        )
        struct.pack_into("<i", data, pos + 4, code)
        pos += 0x5C
    with open(file + ".edited.yw", "xb") as f:
        f.write(data)
N_A_editor.py 文件源码 项目:Nier-Automata-editor 作者: CensoredUsername 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def save(self):
        new = bytearray(self.original)

        new[0x4:0x10] = self.gamedata_header

        struct.pack_into("<L", new, 0x00024, self.time_played)
        struct.pack_into("<L", new, 0x0002C, self.last_saved_chapter)

        namebuf = self.name.encode("utf-16-le")
        # limit length
        if len(namebuf) > 68:
            namebuf = namebuf[:68]
        # pad with zeroes
        namebuf = namebuf + b"\0" * (70 - len(namebuf))
        new[0x34:0x7A] = namebuf

        struct.pack_into("<L", new, 0x3056C, self.money)
        struct.pack_into("<L", new, 0x3871C, self.experience)

        # back up the file
        shutil.move(self.path, self.path + ".bak")

        # write the new one
        with open(self.path, "wb") as f:
            f.write(new)
bno055_sim.py 文件源码 项目:pysteamworks 作者: thedropbears 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def transactionI2C(self, port, device_address, data_to_send, send_size, data_received, receive_size):
        '''
            To give data back use ``data_received``::

                data_received[:] = [1,2,3...]

            :returns: number of bytes returned
        '''

        if data_to_send[0] == BNO055.BNO055_EULER_H_LSB_ADDR:
            struct.pack_into('<h', data_received, 0, int(self.heading * 900.0))
        if data_to_send[0] == BNO055.BNO055_EULER_P_LSB_ADDR:
            struct.pack_into('<h', data_received, 0, int(self.pitch * 900.0))
        if data_to_send[0] == BNO055.BNO055_EULER_R_LSB_ADDR:
            struct.pack_into('<h', data_received, 0, int(self.roll * 900.0))

        return receive_size
test_struct.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_pack_into_fn(self):
        test_string = 'Reykjavik rocks, eow!'
        writable_buf = array.array('c', ' '*100)
        fmt = '21s'
        pack_into = lambda *args: struct.pack_into(fmt, *args)

        # Test without offset.
        pack_into(writable_buf, 0, test_string)
        from_buf = writable_buf.tostring()[:len(test_string)]
        self.assertEqual(from_buf, test_string)

        # Test with offset.
        pack_into(writable_buf, 10, test_string)
        from_buf = writable_buf.tostring()[:len(test_string)+10]
        self.assertEqual(from_buf, test_string[:10] + test_string)

        # Go beyond boundaries.
        small_buf = array.array('c', ' '*10)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
                          test_string)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
                          test_string)
test_struct.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_pack_into_fn(self):
        test_string = b'Reykjavik rocks, eow!'
        writable_buf = array.array('b', b' '*100)
        fmt = '21s'
        pack_into = lambda *args: struct.pack_into(fmt, *args)

        # Test without offset.
        pack_into(writable_buf, 0, test_string)
        from_buf = writable_buf.tobytes()[:len(test_string)]
        self.assertEqual(from_buf, test_string)

        # Test with offset.
        pack_into(writable_buf, 10, test_string)
        from_buf = writable_buf.tobytes()[:len(test_string)+10]
        self.assertEqual(from_buf, test_string[:10] + test_string)

        # Go beyond boundaries.
        small_buf = array.array('b', b' '*10)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
                          test_string)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
                          test_string)
test_struct.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_trailing_counter(self):
        store = array.array('b', b' '*100)

        # format lists containing only count spec should result in an error
        self.assertRaises(struct.error, struct.pack, '12345')
        self.assertRaises(struct.error, struct.unpack, '12345', '')
        self.assertRaises(struct.error, struct.pack_into, '12345', store, 0)
        self.assertRaises(struct.error, struct.unpack_from, '12345', store, 0)

        # Format lists with trailing count spec should result in an error
        self.assertRaises(struct.error, struct.pack, 'c12345', 'x')
        self.assertRaises(struct.error, struct.unpack, 'c12345', 'x')
        self.assertRaises(struct.error, struct.pack_into, 'c12345', store, 0,
                           'x')
        self.assertRaises(struct.error, struct.unpack_from, 'c12345', store,
                           0)

        # Mixed format tests
        self.assertRaises(struct.error, struct.pack, '14s42', 'spam and eggs')
        self.assertRaises(struct.error, struct.unpack, '14s42',
                          'spam and eggs')
        self.assertRaises(struct.error, struct.pack_into, '14s42', store, 0,
                          'spam and eggs')
        self.assertRaises(struct.error, struct.unpack_from, '14s42', store, 0)
test_struct.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_pack_into_fn(self):
        test_string = 'Reykjavik rocks, eow!'
        writable_buf = array.array('c', ' '*100)
        fmt = '21s'
        pack_into = lambda *args: struct.pack_into(fmt, *args)

        # Test without offset.
        pack_into(writable_buf, 0, test_string)
        from_buf = writable_buf.tostring()[:len(test_string)]
        self.assertEqual(from_buf, test_string)

        # Test with offset.
        pack_into(writable_buf, 10, test_string)
        from_buf = writable_buf.tostring()[:len(test_string)+10]
        self.assertEqual(from_buf, test_string[:10] + test_string)

        # Go beyond boundaries.
        small_buf = array.array('c', ' '*10)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
                          test_string)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
                          test_string)
core_cmd_tlm.py 文件源码 项目:OverviewOne 作者: SpaceVR-O1 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _pack_array(values, definition, fmt_string, num_bytes):
    """
    Pack an array item defined by a dictionary entry into a bytearray

    Args:
        values (tuple): 1d or 2d tuple of values to pack
        definition (dict): definition of the data item
        fmt_string (str): struct.unpack format string for an element
        num_bytes (int): number of bytes of each element

    """
    packet_part = bytearray()
    if definition['DIM_2_SIZE']:
        for dim1 in values:
            for value in dim1:
                buff = bytearray(num_bytes)
                struct.pack_into(fmt_string, buff, 0, value)
                packet_part += buff
    else:
        for value in values:
            buff = bytearray(num_bytes)
            struct.pack_into(fmt_string, buff, 0, value)
            packet_part += buff
    return packet_part
hardware.py 文件源码 项目:OverviewOne 作者: SpaceVR-O1 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def mai_set_time(self, gpstime):
        """
        Sets the ADACS clock.

        Arguments:
            gpstime - GPS time is a linear count of seconds elapsed since 0h Jan 6, 1980.
        """

        DATA_LEN = 40 #always
        data = bytearray(DATA_LEN)

        syncbyte    = 0xEB90 # TODO: the Pumpkin comments contain some confusing statements about endianness
        commandID   = 0x44   # set GPS time

        # Create data
        struct.pack_into('<HBL', data, 0, syncbyte, commandID, 
                         gpstime)

        # Add data checksum.  (This is different than the packet checksum.)
        checksum = 0xFFFF & sum(data[0:DATA_LEN]) 
        struct.pack_into('<H', data, 38, checksum) 

        Send.send_bus_cmd(BusCommands.MAI_CMD, data)
OSC.py 文件源码 项目:LiveScript 作者: rekliner 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _transmitMsg(self, msg):
        """Send an OSC message over a streaming socket. Raises exception if it
        should fail. If everything is transmitted properly, True is returned. If
        socket has been closed, False.
        """
        if not isinstance(msg, OSCMessage):
            raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")

        try:
            binary = msg.getBinary()
            length = len(binary)
            # prepend length of packet before the actual message (big endian)
            len_big_endian = array.array('c', '\0' * 4)
            struct.pack_into(">L", len_big_endian, 0, length)
            len_big_endian = len_big_endian.tostring()
            if self._transmit(len_big_endian) and self._transmit(binary):
                return True
            return False            
        except socket.error, e:
            if e[0] == errno.EPIPE: # broken pipe
                return False
            raise e
sgs_assembler.py 文件源码 项目:inshack-2017 作者: HugoDelval 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def assemble_int_data(self, int_data, global_offset):
        offset = None
        data_section = bytes()
        int_buffer = ctypes.create_string_buffer(4)
        for var in int_data:
            length = 0
            t = var['val']
            if t is not None:
                offset = global_offset
                for v in t:
                    struct.pack_into('>i', int_buffer, 0, v)
                    data_section += int_buffer
                    global_offset += 32
                    length += 32
                struct.pack_into('>I', int_buffer, 0, 0xffffffff)
                data_section += int_buffer
                global_offset += 32
                length += 32
            else:
                offset = None
                length = 32
            self.add_var(SGSConfig.VT_INT, '@'+var['name'], offset, length)
        return (data_section, global_offset)
sgs_assembler.py 文件源码 项目:inshack-2017 作者: HugoDelval 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def assemble_int_data(self, int_data, global_offset):
        offset = None
        data_section = bytes()
        int_buffer = ctypes.create_string_buffer(4)
        for var in int_data:
            length = 0
            t = var['val']
            if t is not None:
                offset = global_offset
                for v in t:
                    struct.pack_into('>i', int_buffer, 0, v)
                    data_section += int_buffer
                    global_offset += 32
                    length += 32
                struct.pack_into('>I', int_buffer, 0, 0xffffffff)
                data_section += int_buffer
                global_offset += 32
                length += 32
            else:
                offset = None
                length = 32
            self.add_var(SGSConfig.VT_INT, '@'+var['name'], offset, length)
        return (data_section, global_offset)
test_struct.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_pack_into_fn(self):
        test_string = b'Reykjavik rocks, eow!'
        writable_buf = array.array('b', b' '*100)
        fmt = '21s'
        pack_into = lambda *args: struct.pack_into(fmt, *args)

        # Test without offset.
        pack_into(writable_buf, 0, test_string)
        from_buf = writable_buf.tobytes()[:len(test_string)]
        self.assertEqual(from_buf, test_string)

        # Test with offset.
        pack_into(writable_buf, 10, test_string)
        from_buf = writable_buf.tobytes()[:len(test_string)+10]
        self.assertEqual(from_buf, test_string[:10] + test_string)

        # Go beyond boundaries.
        small_buf = array.array('b', b' '*10)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 0,
                          test_string)
        self.assertRaises((ValueError, struct.error), pack_into, small_buf, 2,
                          test_string)
test_struct.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_trailing_counter(self):
        store = array.array('b', b' '*100)

        # format lists containing only count spec should result in an error
        self.assertRaises(struct.error, struct.pack, '12345')
        self.assertRaises(struct.error, struct.unpack, '12345', '')
        self.assertRaises(struct.error, struct.pack_into, '12345', store, 0)
        self.assertRaises(struct.error, struct.unpack_from, '12345', store, 0)

        # Format lists with trailing count spec should result in an error
        self.assertRaises(struct.error, struct.pack, 'c12345', 'x')
        self.assertRaises(struct.error, struct.unpack, 'c12345', 'x')
        self.assertRaises(struct.error, struct.pack_into, 'c12345', store, 0,
                           'x')
        self.assertRaises(struct.error, struct.unpack_from, 'c12345', store,
                           0)

        # Mixed format tests
        self.assertRaises(struct.error, struct.pack, '14s42', 'spam and eggs')
        self.assertRaises(struct.error, struct.unpack, '14s42',
                          'spam and eggs')
        self.assertRaises(struct.error, struct.pack_into, '14s42', store, 0,
                          'spam and eggs')
        self.assertRaises(struct.error, struct.unpack_from, '14s42', store, 0)
OSC.py 文件源码 项目:game_engine_evertims 作者: EVERTims 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _transmitMsg(self, msg):
        """Send an OSC message over a streaming socket. Raises exception if it
        should fail. If everything is transmitted properly, True is returned. If
        socket has been closed, False.
        """
        if not isinstance(msg, OSCMessage):
            raise TypeError("'msg' argument is not an OSCMessage or OSCBundle object")

        try:
            binary = msg.getBinary()
            length = len(binary)
            # prepend length of packet before the actual message (big endian)
            len_big_endian = array.array('c', '\0' * 4)
            struct.pack_into(">L", len_big_endian, 0, length)
            len_big_endian = len_big_endian.tostring()
            if self._transmit(len_big_endian) and self._transmit(binary):
                return True
            return False            
        except socket.error as e:
            if e[0] == errno.EPIPE: # broken pipe
                return False
            raise e
igmp.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def serialize(self):
        buf = bytearray(struct.pack(self._PACK_STR, self.type_,
                        self.aux_len, self.num,
                        addrconv.ipv4.text_to_bin(self.address)))
        for src in self.srcs:
            buf.extend(struct.pack('4s', addrconv.ipv4.text_to_bin(src)))
        if 0 == self.num:
            self.num = len(self.srcs)
            struct.pack_into('!H', buf, 2, self.num)
        if self.aux is not None:
            mod = len(self.aux) % 4
            if mod:
                self.aux += bytearray(4 - mod)
                self.aux = six.binary_type(self.aux)
            buf.extend(self.aux)
            if 0 == self.aux_len:
                self.aux_len = len(self.aux) // 4
                struct.pack_into('!B', buf, 1, self.aux_len)
        return six.binary_type(buf)
ipv4.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def serialize(self, payload, prev):
        length = len(self)
        hdr = bytearray(length)
        version = self.version << 4 | self.header_length
        flags = self.flags << 13 | self.offset
        if self.total_length == 0:
            self.total_length = self.header_length * 4 + len(payload)
        struct.pack_into(ipv4._PACK_STR, hdr, 0, version, self.tos,
                         self.total_length, self.identification, flags,
                         self.ttl, self.proto, 0,
                         addrconv.ipv4.text_to_bin(self.src),
                         addrconv.ipv4.text_to_bin(self.dst))

        if self.option:
            assert (length - ipv4._MIN_LEN) >= len(self.option)
            hdr[ipv4._MIN_LEN:ipv4._MIN_LEN + len(self.option)] = self.option

        self.csum = packet_utils.checksum(hdr)
        struct.pack_into('!H', hdr, 10, self.csum)
        return hdr
icmpv6.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def serialize(self):
        buf = bytearray(struct.pack(self._PACK_STR, self.type_,
                        self.aux_len, self.num,
                        addrconv.ipv6.text_to_bin(self.address)))
        for src in self.srcs:
            buf.extend(struct.pack('16s', addrconv.ipv6.text_to_bin(src)))
        if 0 == self.num:
            self.num = len(self.srcs)
            struct.pack_into('!H', buf, 2, self.num)
        if self.aux is not None:
            mod = len(self.aux) % 4
            if mod:
                self.aux += bytearray(4 - mod)
                self.aux = six.binary_type(self.aux)
            buf.extend(self.aux)
            if 0 == self.aux_len:
                self.aux_len = len(self.aux) // 4
                struct.pack_into('!B', buf, 1, self.aux_len)
        return six.binary_type(buf)


问题


面经


文章

微信
公众号

扫码关注公众号