def encode_8(bytes, key, terminator):
"""
Encode the bytecode with the given 8-bit XOR key.
:type bytes: str
:param bytes: Bytecode to encode.
:type key: str
:param key: 8-bit XOR key.
:type terminator: str
:param terminator: 8-bit terminator.
:rtype: str
:return: Encoded bytecode.
"""
if not bytes.endswith(terminator):
bytes += terminator
fmt = "B" * len(bytes)
unpack = struct.unpack
pad = unpack("B", key) * len(bytes)
bytes = unpack(fmt, bytes)
bytes = [ bytes[i] ^ pad[i] for i in xrange(len(bytes)) ]
return struct.pack(fmt, *bytes)
python类unpack()的实例源码
def decrypt_file(key, in_filename, out_filename=None, chunksize=24*1024):
# Split .crypt extension to restore file format
if not out_filename:
out_filename = os.path.splitext(in_filename)[0]
with open(in_filename, 'rb') as infile:
origsize = struct.unpack('<Q', infile.read(struct.calcsize('Q')))[0]
iv = infile.read(16)
decryptor = AES.new(key, AES.MODE_CBC, iv)
with open(out_filename, 'wb') as outfile:
while True:
chunk = infile.read(chunksize)
if len(chunk) == 0:
break
outfile.write(decryptor.decrypt(chunk))
# Truncate file to original size
outfile.truncate(origsize)
def parse_default(field, ftype, fdefault):
if not (ftype == 'bool' and fdefault == 'true'):
try:
fdefault = literal_eval(fdefault.rstrip('LDF'))
except (ValueError, SyntaxError):
fdefault = None
if type(fdefault) is int:
if ftype[0] != 'u' and ftype[:5] != 'fixed':
if fdefault >> 63:
fdefault = c_long(fdefault).value
elif fdefault >> 31 and ftype[-2:] != '64':
fdefault = c_int(fdefault).value
else:
fdefault &= (1 << int(ftype[-2:])) - 1
if ftype == 'float' and abs(fdefault) >> 23:
fdefault = unpack('=f', pack('=i', fdefault))[0]
elif ftype == 'double' and abs(fdefault) >> 52:
fdefault = unpack('=d', pack('=q', fdefault))[0]
if fdefault:
field.default_value = str(fdefault)
def parse_bgzf_header(f):
cur_pos = f.tell()
header_fmt = "BBBBIBBH"
d = f.read(12)
# We are at EOF when read returns an empty string
if d == '':
return None
header = struct.unpack(header_fmt, d)
# Check for a valid gzip header
if header[0] != 31 or header[1] != 139:
raise Exception("Not a valid gzip header")
xlen = header[7]
bsize = get_bsize(f, f.tell(), xlen)
next_pos = cur_pos + bsize + 1
f.seek(next_pos)
return next_pos
def get_user_hashes(user_key, hbootkey):
samaddr = user_key.space
rid = int(user_key.Name, 16)
V = None
for v in values(user_key):
if v.Name == 'V':
V = samaddr.read(v.Data.value, v.DataLength.value)
if not V: return None
hash_offset = unpack("<L", V[0x9c:0x9c+4])[0] + 0xCC
lm_exists = True if unpack("<L", V[0x9c+4:0x9c+8])[0] == 20 else False
nt_exists = True if unpack("<L", V[0x9c+16:0x9c+20])[0] == 20 else False
enc_lm_hash = V[hash_offset+4:hash_offset+20] if lm_exists else ""
enc_nt_hash = V[hash_offset+(24 if lm_exists else 8):hash_offset+(24 if lm_exists else 8)+16] if nt_exists else ""
return decrypt_hashes(rid, enc_lm_hash, enc_nt_hash, hbootkey)
def decode(self, offset):
"""Decode a section of the data section starting at offset
Arguments:
offset -- the location of the data structure to decode
"""
new_offset = offset + 1
(ctrl_byte,) = struct.unpack(b'!B', self._buffer[offset:new_offset])
type_num = ctrl_byte >> 5
# Extended type
if not type_num:
(type_num, new_offset) = self._read_extended(new_offset)
(size, new_offset) = self._size_from_ctrl_byte(
ctrl_byte, new_offset, type_num)
return self._type_decoder[type_num](self, size, new_offset)
def _size_from_ctrl_byte(self, ctrl_byte, offset, type_num):
size = ctrl_byte & 0x1f
if type_num == 1:
return size, offset
bytes_to_read = 0 if size < 29 else size - 28
new_offset = offset + bytes_to_read
size_bytes = self._buffer[offset:new_offset]
# Using unpack rather than int_from_bytes as it is about 200 lookups
# per second faster here.
if size == 29:
size = 29 + struct.unpack(b'!B', size_bytes)[0]
elif size == 30:
size = 285 + struct.unpack(b'!H', size_bytes)[0]
elif size > 30:
size = struct.unpack(
b'!I', size_bytes.rjust(4, b'\x00'))[0] + 65821
return size, new_offset
def _read_node(self, node_number, index):
base_offset = node_number * self._metadata.node_byte_size
record_size = self._metadata.record_size
if record_size == 24:
offset = base_offset + index * 3
node_bytes = b'\x00' + self._buffer[offset:offset + 3]
elif record_size == 28:
(middle,) = struct.unpack(
b'!B', self._buffer[base_offset + 3:base_offset + 4])
if index:
middle &= 0x0F
else:
middle = (0xF0 & middle) >> 4
offset = base_offset + index * 4
node_bytes = byte_from_int(
middle) + self._buffer[offset:offset + 3]
elif record_size == 32:
offset = base_offset + index * 4
node_bytes = self._buffer[offset:offset + 4]
else:
raise InvalidDatabaseError(
'Unknown record size: {0}'.format(record_size))
return struct.unpack(b'!I', node_bytes)[0]
def grayscale(self):
""" Convert the image into a (24-bit) grayscale one, using the Y'UV method. """
# http://en.wikipedia.org/wiki/YUV
Wr = 0.299
Wb = 0.114
Wg = 0.587
mod_bitmap = ""
f = StringIO(self.bitmap_data)
for row_num in xrange(0, self.height):
for pix in xrange(0, self.width):
pixel = struct.unpack("3B", f.read(3))
out_pix = chr(int(Wr * pixel[2] + Wg * pixel[1] + Wb * pixel[0]))
mod_bitmap += out_pix * 3
mod_bitmap += chr(0x00) * self.padding_size
f.seek(self.padding_size, 1)
self.bitmap_data = mod_bitmap
return self
def _receiveMsg(self):
""" Receive OSC message from a socket and decode.
If an error occurs, None is returned, else the message.
"""
# get OSC packet size from stream which is prepended each transmission
chunk = self._receive(4)
if chunk == None:
print("SERVER: Socket has been closed.")
return None
# extract message length from big endian unsigned long (32 bit)
slen = struct.unpack(">L", chunk)[0]
# receive the actual message
chunk = self._receive(slen)
if chunk == None:
print("SERVER: Socket has been closed.")
return None
# decode OSC data and dispatch
msg = decodeOSC(chunk)
if msg == None:
raise OSCError("SERVER: Message decoding failed.")
return msg
def _receiveMsgWithTimeout(self):
""" Receive OSC message from a socket and decode.
If an error occurs, None is returned, else the message.
"""
# get OSC packet size from stream which is prepended each transmission
chunk = self._receiveWithTimeout(4)
if not chunk:
return None
# extract message length from big endian unsigned long (32 bit)
slen = struct.unpack(">L", chunk)[0]
# receive the actual message
chunk = self._receiveWithTimeout(slen)
if not chunk:
return None
# decode OSC content
msg = decodeOSC(chunk)
if msg == None:
raise OSCError("CLIENT: Message decoding failed.")
return msg
def _receiveMsg(self):
""" Receive OSC message from a socket and decode.
If an error occurs, None is returned, else the message.
"""
# get OSC packet size from stream which is prepended each transmission
chunk = self._receive(4)
if chunk == None:
print "SERVER: Socket has been closed."
return None
# extract message length from big endian unsigned long (32 bit)
slen = struct.unpack(">L", chunk)[0]
# receive the actual message
chunk = self._receive(slen)
if chunk == None:
print "SERVER: Socket has been closed."
return None
# decode OSC data and dispatch
msg = decodeOSC(chunk)
if msg == None:
raise OSCError("SERVER: Message decoding failed.")
return msg
def _receiveMsgWithTimeout(self):
""" Receive OSC message from a socket and decode.
If an error occurs, None is returned, else the message.
"""
# get OSC packet size from stream which is prepended each transmission
chunk = self._receiveWithTimeout(4)
if not chunk:
return None
# extract message length from big endian unsigned long (32 bit)
slen = struct.unpack(">L", chunk)[0]
# receive the actual message
chunk = self._receiveWithTimeout(slen)
if not chunk:
return None
# decode OSC content
msg = decodeOSC(chunk)
if msg == None:
raise OSCError("CLIENT: Message decoding failed.")
return msg
def from_key_val_list(value):
"""Take an object and test to see if it can be represented as a
dictionary. Unless it can not be represented as such, return an
OrderedDict, e.g.,
::
>>> from_key_val_list([('key', 'val')])
OrderedDict([('key', 'val')])
>>> from_key_val_list('string')
ValueError: need more than 1 value to unpack
>>> from_key_val_list({'key': 'val'})
OrderedDict([('key', 'val')])
:rtype: OrderedDict
"""
if value is None:
return None
if isinstance(value, (str, bytes, bool, int)):
raise ValueError('cannot encode objects that are not 2-tuples')
return OrderedDict(value)
def addressInNetwork(self, ip, cidr):
# the ip can be the emtpy string ('') in cases where the connection
# is made via a web proxy. in these cases the sensor cannot report
# the true remote IP as DNS resolution happens on the web proxy (and
# not the endpoint)
if '' == ip:
return False
try:
net = cidr.split('/')[0]
bits = cidr.split('/')[1]
if int(ip) > 0:
ipaddr = struct.unpack('<L', socket.inet_aton(ip))[0]
else:
ipaddr = struct.unpack('<L', socket.inet_aton(".".join(map(lambda n: str(int(ip)>>n & 0xFF), [24,16,8,0]))))[0]
netaddr = struct.unpack('<L', socket.inet_aton(net))[0]
netmask = ((1L << int(bits)) - 1)
return ipaddr & netmask == netaddr & netmask
except:
return False
def file_scanlines(self, infile):
"""
Generates boxed rows in flat pixel format, from the input file
`infile`. It assumes that the input file is in a "Netpbm-like"
binary format, and is positioned at the beginning of the first
pixel. The number of pixels to read is taken from the image
dimensions (`width`, `height`, `planes`) and the number of bytes
per value is implied by the image `bitdepth`.
"""
# Values per row
vpr = self.width * self.planes
row_bytes = vpr
if self.bitdepth > 8:
assert self.bitdepth == 16
row_bytes *= 2
fmt = '>%dH' % vpr
def line():
return array('H', struct.unpack(fmt, infile.read(row_bytes)))
else:
def line():
scanline = array('B', infile.read(row_bytes))
return scanline
for y in range(self.height):
yield line()
def chunklentype(self):
"""Reads just enough of the input to determine the next
chunk's length and type, returned as a (*length*, *type*) pair
where *type* is a string. If there are no more chunks, ``None``
is returned.
"""
x = self.file.read(8)
if not x:
return None
if len(x) != 8:
raise FormatError(
'End of file whilst reading chunk length and type.')
length,type = struct.unpack('!I4s', x)
if length > 2**31-1:
raise FormatError('Chunk %s is too large: %d.' % (type,length))
return length,type
def _process_tRNS(self, data):
# http://www.w3.org/TR/PNG/#11tRNS
self.trns = data
if self.colormap:
if not self.plte:
warnings.warn("PLTE chunk is required before tRNS chunk.")
else:
if len(data) > len(self.plte)/3:
# Was warning, but promoted to Error as it
# would otherwise cause pain later on.
raise FormatError("tRNS chunk is too long.")
else:
if self.alpha:
raise FormatError(
"tRNS chunk is not valid with colour type %d." %
self.color_type)
try:
self.transparent = \
struct.unpack("!%dH" % self.color_planes, data)
except struct.error:
raise FormatError("tRNS chunk has incorrect length.")
def parse_policy(raw_data):
"""Parse policy data."""
policy = {}
raw_int = _raw_to_int(raw_data)
policy['domain_id'] = DOMAINS_REV[raw_int[3] & 0x0F]
policy['enabled'] = bool(raw_int[3] & 0x10)
policy['per_domain_enabled'] = bool(raw_int[3] & 0x20)
policy['global_enabled'] = bool(raw_int[3] & 0x40)
policy['created_by_nm'] = not bool(raw_int[3] & 0x80)
policy['policy_trigger'] = TRIGGERS_REV[raw_int[4] & 0x0F]
policy['power_policy'] = bool(raw_int[4] & 0x10)
power_correction = CPU_CORRECTION_REV[raw_int[4] & 0x60]
policy['cpu_power_correction'] = power_correction
policy['storage'] = STORAGE_REV[raw_int[4] & 0x80]
policy['action'] = ACTIONS_REV[raw_int[5] & 0x01]
policy['power_domain'] = POWER_DOMAIN_REV[raw_int[5] & 0x80]
policy_values = struct.unpack('<HIHH', bytearray(raw_int[6:]))
policy_names = ('target_limit', 'correction_time', 'trigger_limit',
'reporting_period')
_add_to_dict(policy, policy_values, policy_names)
return policy
def parse_capabilities(raw_data):
"""Parse capabilities data."""
capabilities = {}
raw_int = _raw_to_int(raw_data)
capabilities['max_policies'] = raw_int[3]
capabilities_values = struct.unpack('<HHIIHH', bytearray(
raw_int[4:20]))
capabilities_names = ('max_limit_value', 'min_limit_value',
'min_correction_time', 'max_correction_time',
'min_reporting_period', 'max_reporting_period')
_add_to_dict(capabilities, capabilities_values, capabilities_names)
capabilities['domain_id'] = DOMAINS_REV[raw_int[20] & 0x0F]
power_domain = POWER_DOMAIN_REV[raw_int[20] & 0x80]
capabilities['power_domain'] = power_domain
return capabilities
def __init__(self, *args, **kwargs):
if args:
self.type = struct.unpack('>I', args[0][:4])[0]
# check if it's an event data record that uses archival timestamps and if we've set archival
if self.type in ARCHIVAL_RCD_TYPES and config.test_bit(Struct.get_flags(), 23) and 'reserved' not in self._field_names_:
self._fields_.extend([('timestamp', 'uint32', 0), ('reserved', 'uint32', 0)])
self._field_names_.extend(['timestamp', 'reserved'])
self._field_format_.update({'timestamp': 'I', 'reserved': 'I'})
else:
pass
# The field values do not reset after being extended for some reason (metaclass). Without this, all events parsed after the first ARCHIVAL_RCD gets parsed as if it has the 'reserved' and 'timestamp field'
#map(self._fields_.remove, [f for f in self._fields_ if f[0] in ['timestamp', 'reserved']])
#map(self._field_names_.remove, [f for f in self._field_names_ if f[0] in ['timestamp', 'reserved']])
self._fields_ = [f for f in self._fields_ if f[0] not in ['timestamp', 'reserved']]
self._field_names_ = [f for f in self._field_names_ if f[0] not in ['timestamp', 'reserved']]
for k in ['timestamp', 'reserved']:
self._field_format_.pop(k)
super(EventData, self).__init__(*args, **kwargs)
self._unpack_data()
def from_key_val_list(value):
"""Take an object and test to see if it can be represented as a
dictionary. Unless it can not be represented as such, return an
OrderedDict, e.g.,
::
>>> from_key_val_list([('key', 'val')])
OrderedDict([('key', 'val')])
>>> from_key_val_list('string')
ValueError: need more than 1 value to unpack
>>> from_key_val_list({'key': 'val'})
OrderedDict([('key', 'val')])
:rtype: OrderedDict
"""
if value is None:
return None
if isinstance(value, (str, bytes, bool, int)):
raise ValueError('cannot encode objects that are not 2-tuples')
return OrderedDict(value)
def from_key_val_list(value):
"""Take an object and test to see if it can be represented as a
dictionary. Unless it can not be represented as such, return an
OrderedDict, e.g.,
::
>>> from_key_val_list([('key', 'val')])
OrderedDict([('key', 'val')])
>>> from_key_val_list('string')
ValueError: need more than 1 value to unpack
>>> from_key_val_list({'key': 'val'})
OrderedDict([('key', 'val')])
:rtype: OrderedDict
"""
if value is None:
return None
if isinstance(value, (str, bytes, bool, int)):
raise ValueError('cannot encode objects that are not 2-tuples')
return OrderedDict(value)
def __parse_header(self):
self.header_length, = struct.unpack('<I', await self.buffer.read(4))
self.header_chunk_count, = struct.unpack('<I', await self.buffer.read(4))
self.header_chunks = dict()
self.header = dict()
# Save header data from binary.
for nr in range(self.header_chunk_count):
chunk_id, = struct.unpack('<I', await self.buffer.read(4))
chunk_size, = struct.unpack('<I', await self.buffer.read(4))
self.header_chunks[chunk_id] = chunk_size & ~0x80000000
# Parse all header chunks.
for chunk_id, chunk_size in self.header_chunks.items():
self.strings.reset()
self.header.update(await self.__parse_chunk(chunk_id, chunk_size))
return self.header
def to_rain(cls, val):
if val is None:
return cls.new(typi.null, 0, 0, cls.null)
elif val is False:
return cls.new(typi.bool, 0, 0, cls.null)
elif val is True:
return cls.new(typi.bool, 0, 1, cls.null)
elif isinstance(val, int):
return cls.new(typi.int, 0, val, cls.null)
elif isinstance(val, float):
raw = struct.pack('d', val)
intrep = struct.unpack('Q', raw)[0]
return cls.new(typi.float, 0, intrep, cls.null)
elif isinstance(val, str):
str_p = ct.create_string_buffer(val.encode('utf-8'))
cls._saves_.append(str_p)
return cls.new(typi.str, len(val), ct.cast(str_p, ct.c_void_p).value, cls.null)
raise Exception("Can't convert value {!r} to Rain".format(val))
def _check_crc (self, address, binary):
'''
Compares the CRC of the local binary to the one calculated by the
bootloader.
'''
# Check the CRC
crc_data = self._get_crc_internal_flash(address, len(binary))
# Now interpret the returned bytes as the CRC
crc_bootloader = struct.unpack('<I', crc_data[0:4])[0]
# Calculate the CRC locally
crc_function = crcmod.mkCrcFun(0x104c11db7, initCrc=0, xorOut=0xFFFFFFFF)
crc_loader = crc_function(binary, 0)
if crc_bootloader != crc_loader:
raise TockLoaderException('Error: CRC check failed. Expected: 0x{:04x}, Got: 0x{:04x}'.format(crc_loader, crc_bootloader))
else:
print('CRC check passed. Binaries successfully loaded.')
def _checksum (self, buffer):
'''
Calculate the TBF header checksum.
'''
# Add 0s to the end to make sure that we are multiple of 4.
padding = len(buffer) % 4
if padding != 0:
padding = 4 - padding
buffer += bytes([0]*padding)
# Loop throw
checksum = 0
for i in range(0, len(buffer), 4):
checksum ^= struct.unpack('<I', buffer[i:i+4])[0]
return checksum
def __init__(self, file, align=True, bigendian=True, inclheader=False):
import struct
self.closed = False
self.align = align # whether to align to word (2-byte) boundaries
if bigendian:
strflag = '>'
else:
strflag = '<'
self.file = file
self.chunkname = file.read(4)
if len(self.chunkname) < 4:
raise EOFError
try:
self.chunksize = struct.unpack(strflag+'L', file.read(4))[0]
except struct.error:
raise EOFError
if inclheader:
self.chunksize = self.chunksize - 8 # subtract header
self.size_read = 0
try:
self.offset = self.file.tell()
except (AttributeError, IOError):
self.seekable = False
else:
self.seekable = True
def py_suffix_importer(filename, finfo, fqname):
file = filename[:-3] + _suffix
t_py = long(finfo[8])
t_pyc = _timestamp(file)
code = None
if t_pyc is not None and t_pyc >= t_py:
f = open(file, 'rb')
if f.read(4) == imp.get_magic():
t = struct.unpack('<I', f.read(4))[0]
if t == t_py:
code = marshal.load(f)
f.close()
if code is None:
file = filename
code = _compile(file, t_py)
return 0, code, { '__file__' : file }
def scan_opcodes(self, co,
unpack = struct.unpack):
# Scan the code, and yield 'interesting' opcode combinations
# Version for Python 2.4 and older
code = co.co_code
names = co.co_names
consts = co.co_consts
while code:
c = code[0]
if c in STORE_OPS:
oparg, = unpack('<H', code[1:3])
yield "store", (names[oparg],)
code = code[3:]
continue
if c == LOAD_CONST and code[3] == IMPORT_NAME:
oparg_1, oparg_2 = unpack('<xHxH', code[:6])
yield "import", (consts[oparg_1], names[oparg_2])
code = code[6:]
continue
if c >= HAVE_ARGUMENT:
code = code[3:]
else:
code = code[1:]