def _parseiesubel_(info,f=_iesubel_):
"""
parse a variable length info element sub element
:param info: packed string, next element starts at index 0
:param f: function to apply to each sub element for further parsing
:returns: list of parsed subelements
"""
opt = []
offset = 0
while len(info) >= 2: # may be flags (0-octet subelements)
sid,slen = struct.unpack_from('=2B',info,offset)
opt.append((sid,f(info[offset+2:offset+2+slen],sid)))
offset += 2 + slen
return opt
#### OPTIONAL SUBELEMENTS -> the sub element id and length have been stripped
python类unpack_from()的实例源码
def _iesubeltfsreq_(s,sid):
""" :returns: parsed tfs request subelement """
ret = s
if sid == std.EID_TFS_SUBELEMENT_TFS:
# there are one or more tclas elements folled by an option tclas
# processing element
ret = {}
while s:
eid,tlen = struct.unpack_from('=2B',s)
if eid == std.EID_TCLAS:
if not 'tclas' in ret: ret['tclas'] = []
ret['tclas'].append(_parseie_(std.EID_TCLAS,s[:tlen]))
s = s[2+tlen:]
elif eid == std.EID_TCLAS_PRO:
s['tclas-pro'] = _parseie_(std.EID_TCLAS_PRO,ret)
# could use a break here but want to make sure
# there are not hanging elements
s = s[3:]
elif sid == std.EID_TFS_SUBELEMENT_VEND:
ret = _parseie_(std.EID_VEND_SPEC,s)
return ret
# MSMT Pilot subelements Std Table 8-117
def _iesubelmsmtreqstasta_(s,sid):
""" :returns: parsed subelement of type sta request for sta counters in msmt request """
ret = s
if sid == std.EID_MSMT_REQ_SUBELEMENT_STA_RPT:
# Std Fig. 8-117
cnt,to,t = struct.unpack_from('=I2H',s)
ts = s[8:]
ret = {'msmt-cnt':cnt,
'trigger-timeout':to,
'sta-cntr-trigger-cond':_stacntrtriggerconds_(t),
'thresholds':{}}
# optional count fields are 4-bytes assuming they are appending in order
for thresh in ['fail','fcs-error','mult-retry','dup','rts-fail','ack-fail','retry-cnt']:
if ret['sta-cntr-trigger-cond'][thresh]:
ret['thresholds'][thresh] = struct.unpack_from('=I',ts)[0]
ts = ts[4:]
elif sid == std.EID_MSMT_REQ_SUBELEMENT_STA_VEND:
ret = _parseie_(std.EID_VEND_SPEC,s)
return ret
# STA Counter Trigger Conditions Std Fig. 8-118
def _iesubelmsmtreqstarsna_(s,sid):
""" :returns: parsed subelement of type sta request for qos counters in msmt request """
ret = s
if sid == std.EID_MSMT_REQ_SUBELEMENT_STA_RPT:
# Std Fig. 8-121
cnt,to,t = struct.unpack_from('=I2H',s)
ts = s[8:]
ret = {'msmt-cnt':cnt,
'trigger-timeout':to,
'rsna-cntr-trigger-cond':_rsnacntrtriggerconds_(t),
'thresholds':{}}
# optional count fields are 4-bytes assuming they are appending in order
for thresh in ['cmacicv-err','cmarc-replay','robust-ccmp-replay','tkipicv-err','tkip-replay','ccmp-decrypt','ccmp-replay']:
if ret['sta-cntr-trigger-cond'][thresh]:
ret['thresholds'][thresh] = struct.unpack_from('=I',ts)[0]
ts = ts[4:]
elif sid == std.EID_MSMT_REQ_SUBELEMENT_STA_VEND:
ret = _parseie_(std.EID_VEND_SPEC,s)
return ret
# RSNA Counter Trigger Conditions Std Fig. 8-122
def _iesubelmsmtreqtx_(s,sid):
""" :returns: parsed tx optional subfield """
ret = s
if sid == std.EID_MSMT_REQ_SUBELEMENT_TX_RPT:
c,ae,ce,d,m,to = struct.unpack_from('=6B',s)
ret = {'trigger-cond':_eidmsmtreqtxtrigger_(c),
'avg-err-thresh':ae,
'cons-err-thresh':ce,
'delay-thresh':_eidmsmtreqtxdelay_(d),
'msmt-cnt':m,
'trigger-timeout':to}
elif sid == std.EID_MSMT_REQ_SUBELEMENT_TX_VEND:
ret = _parseie_(std.EID_VEND_SPEC,s)
return ret
# TX TRIGGER CONDITION Std Fig. 8-132
def _iesubelmsmtrptframe_(s,sid):
""" :returns: parsed frame subelement """
ret = s
if sid == std.EID_MSMT_RPT_FRAME_CNT_RPT:
# Fig 8-151, 8-152
ret = []
n = len(s)/19
for i in xrange(n):
vs = struct.unpack_from('=17BH',s,i*19)
ent = {'tx-addr':_hwaddr_(vs[0:6]),
'bssid':_hwaddr_(vs[6:12]),
'phy-type':vs[12],
'avg-rcpi':vs[13],
'last-rsni':vs[14],
'last-rcpi':vs[15],
'antenna-id':vs[16],
'frmae-cnt':vs[17]}
ret.append(ent)
elif sid == std.EID_MSMT_RPT_FRAME_VEND:
ret = _parseie_(std.EID_VEND_SPEC,s)
return ret
# MSMT Report subelements for Type STA statistics Std Table 8-89
def _iesubelmsmtrptlci_(s,sid):
""" :returns: parsed LCI optional subelement """
ret = s
if sid == std.EID_MSMT_RPT_LCI_AZIMUTH:
ret = {
'azimuth-rpt':_iesubelmsmtrptlicazimuth_(struct.unpack_from('=H',s)[0])
}
elif sid == std.EID_MSMT_RPT_LCI_ORIGIN:
ret = {'originator':_hwaddr_(struct.unpack('=6B',s))}
elif sid == std.EID_MSMT_RPT_LCI_TARGET:
ret = {'target':_hwaddr_(struct.unpack('=6B',s))}
elif sid == std.EID_MSMT_RPT_LCI_VEND:
ret = _parseie_(std.EID_VEND_SPEC,s)
return ret
# MSMT Report->Azimuth Report fields Std Fig. 8-164
def _iesubelevreqtransistion_(s,sid):
""" :returns: parsed subelements of type transistion in event request """
ret = s
if sid == std.EVENT_REQUEST_TYPE_TRANSITION_TARGET:
ret = {'tgt-bssid':_hwaddr_(struct.unpack_from('=6B',s))}
elif sid == std.EVENT_REQUEST_TYPE_TRANSITION_SOURCE:
ret = {'src-bssid':_hwaddr_(struct.unpack_from('=6B',s))}
elif sid == std.EVENT_REQUEST_TYPE_TRANSITION_TIME_TH:
ret = {'trans-time-threshold':struct.unpack_from('=H',s)[0]}
elif sid == std.EVENT_REQUEST_TYPE_TRANSITION_RESULT:
v = struct.unpack_from('=B',s)[0]
ret = {'match-val':_eidevreqsubelmatchval_(v)}
elif sid == std.EVENT_REQUEST_TYPE_TRANSITION_FREQUENT:
ft,t = struct.unpack_from('=BH',s)
ret = {'freq-transistion-cnt-threahold':ft,'time-intv':t}
return ret
# EVENT REQUEST match value for Type transition Std Fig 8-272
def _parseinfoeldse_(s):
""" :returns: parsed dse location from packed string s """
dse = {}
for n,i,l,x,f in _EID_DSE_FIELDS_:
if n == 'lat-int' or n == 'lon-int' or n == 'alt-int':
dse[n] = int2s(s[i:i+l]+'\x00'*x)
else:
dse[n] = struct.unpack_from(f,s[i:i+l]+'\x00'*x)
# last three fields are byte centric
dei,op,chn = struct.unpack_from('=H2B',s,len(s)-4)
dse['depend-enable-id'] = dei
dse['op-class'] = op
dse['ch-num'] = chn
return dse
# HT Capabilities Info field Std Fig 8-249
# octest are defined as 1|1|2|1|1|1|1|2|1|1|1|1|1|1 see Fig 8-249 for names
# See also Std Table 8-124 for definition of sub fields
def decompile(self, data, ttFont):
sstruct.unpack2(Silf_hdr_format, data, self)
if self.version >= 5.0:
(data, self.scheme) = grUtils.decompress(data)
sstruct.unpack2(Silf_hdr_format_3, data, self)
base = sstruct.calcsize(Silf_hdr_format_3)
elif self.version < 3.0:
self.numSilf = struct.unpack('>H', data[4:6])
self.scheme = 0
self.compilerVersion = 0
base = 8
else:
self.scheme = 0
sstruct.unpack2(Silf_hdr_format_3, data, self)
base = sstruct.calcsize(Silf_hdr_format_3)
silfoffsets = struct.unpack_from(('>%dL' % self.numSilf), data[base:])
for offset in silfoffsets:
s = Silf()
self.silfs.append(s)
s.decompile(data[offset:], ttFont, self.version)
def register_schema(cls, typ, schema, typecode):
if typecode is not None and typ in cls.TYPE_CODES:
if cls.TYPE_CODES[typ] != typecode or cls.OBJ_PACKERS[typecode][2].schema is not schema:
raise ValueError("Registering different types with same typecode %r: %r" % (cls.TYPE_CODES[typ], typ))
return cls.OBJ_PACKERS[typecode][2]
if isinstance(typ, mapped_object_with_schema):
packable = typ
else:
packable = mapped_object_with_schema(schema)
class SchemaBufferProxyProperty(GenericBufferProxyProperty):
typ = packable
if typecode is not None:
cls.TYPE_CODES[typ] = typecode
cls.TYPE_CODES[packable] = typecode
cls.OBJ_PACKERS[typecode] = (packable.pack_into, packable.unpack_from, packable)
TYPES[typ] = packable
TYPES[packable] = packable
PROXY_TYPES[typ] = SchemaBufferProxyProperty
PROXY_TYPES[packable] = SchemaBufferProxyProperty
return packable
def __get__(self, obj, klass):
if obj is None:
return self
elif obj.none_bitmap & self.mask:
return None
if cython.compiled:
pybuf = cython.address(obj.pybuf)
buflen = pybuf.len
assert (obj.offs + self.offs + cython.sizeof(cython.longlong)) <= buflen
offs = obj.offs + cython.cast(cython.p_longlong,
cython.cast(cython.p_uchar, pybuf.buf) + obj.offs + self.offs)[0]
if obj.idmap is not None:
poffs = offs # python version of offs
rv = obj.idmap.get(poffs, poffs) # idmap cannot possibly hold "poffs" for that offset
if rv is not poffs:
return rv
assert offs + cython.sizeof(cython.ushort) <= buflen
else:
poffs = offs = obj.offs + struct.unpack_from('q', obj.buf, obj.offs + self.offs)[0]
rv = self.typ.unpack_from(obj.buf, offs)
if obj.idmap is not None:
obj.idmap[poffs] = rv
return rv
def getter(self, proxy_into = None, no_idmap = False):
schema = self.schema
proxy_class = self.proxy_class
index = self.index
idmap = self.idmap if not no_idmap else None
buf = self.buf
if proxy_class is not None:
proxy_class_new = functools.partial(proxy_class.__new__, proxy_class)
else:
proxy_class_new = None
@cython.locals(pos=int)
def getter(pos):
return schema.unpack_from(buf, index[pos], idmap, proxy_class_new, proxy_into)
return getter
def iter_fast(self):
# getter inlined
schema = self.schema
proxy_class = self.proxy_class
index = self.index
idmap = self.idmap
buf = self.buf
if proxy_class is not None:
proxy_class_new = functools.partial(proxy_class.__new__, proxy_class)
else:
proxy_class_new = None
proxy_into = schema.Proxy()
for i in xrange(len(self)):
yield schema.unpack_from(buf, index[i], idmap, proxy_class_new, proxy_into)
def _extract_images(filename):
"""???????????????????
:param filename: ?????
:return: 4??numpy??[index, y, x, depth]? ???np.float32
"""
images = []
print('Extracting {}'.format(filename))
with gzip.GzipFile(fileobj=open(filename, 'rb')) as f:
buf = f.read()
index = 0
magic, num_images, rows, cols = struct.unpack_from('>IIII', buf, index)
if magic != 2051:
raise ValueError('Invalid magic number {} in MNIST image file: {}'.format(magic, filename))
index += struct.calcsize('>IIII')
for i in range(num_images):
img = struct.unpack_from('>784B', buf, index)
index += struct.calcsize('>784B')
img = np.array(img, dtype=np.float32)
# ????[0,255]???[0,1]
img = np.multiply(img, 1.0 / 255.0)
img = img.reshape(rows, cols, 1)
images.append(img)
return np.array(images, dtype=np.float32)
def _extract_labels(filename, num_classes=10):
"""???????????????
:param filename: ?????
:param num_classes: ??one-hot??????????10?
:return: 2??numpy??[index, num_classes]? ???np.float32
"""
labels = []
print('Extracting {}'.format(filename))
with gzip.GzipFile(fileobj=open(filename, 'rb')) as f:
buf = f.read()
index = 0
magic, num_labels = struct.unpack_from('>II', buf, index)
if magic != 2049:
raise ValueError('Invalid magic number {} in MNIST label file: {}'.format(magic, filename))
index += struct.calcsize('>II')
for i in range(num_labels):
label = struct.unpack_from('>B', buf, index)
index += struct.calcsize('>B')
label_one_hot = np.zeros(num_classes, dtype=np.float32)
label_one_hot[label[0]] = 1
labels.append(label_one_hot)
return np.array(labels, dtype=np.float32)
def read_image(filename):
f = open(filename, 'rb')
index = 0
buf = f.read()
f.close()
magic, images, rows, columns = struct.unpack_from('>IIII' , buf , index)
index += struct.calcsize('>IIII')
for i in xrange(images):
#for i in xrange(2000):
image = Image.new('L', (columns, rows))
for x in xrange(rows):
for y in xrange(columns):
image.putpixel((y, x), int(struct.unpack_from('>B', buf, index)[0]))
index += struct.calcsize('>B')
print 'save ' + str(i) + 'image'
image.save('./test/' + str(i) + '.png')
def read_label(filename, saveFilename):
f = open(filename, 'rb')
index = 0
buf = f.read()
f.close()
magic, labels = struct.unpack_from('>II' , buf , index)
index += struct.calcsize('>II')
labelArr = [0] * labels
#labelArr = [0] * 2000
for x in xrange(labels):
#for x in xrange(2000):
labelArr[x] = int(struct.unpack_from('>B', buf, index)[0])
index += struct.calcsize('>B')
save = open(saveFilename, 'w')
save.write(','.join(map(lambda x: str(x), labelArr)))
save.write('\n')
save.close()
print 'save labels success'
def parse(self, data, offset=0):
if len(data) < self.size:
raise Exception("Header: Too small size of input data !")
val = unpack_from(self.FORMAT, data, offset)
self.MagicNumber = val[0]
header_crc = val[1]
self.TimeStamp = val[2]
self.DataSize = val[3]
self.LoadAddress = val[4]
self.EntryAddress = val[5]
self.DataCRC = val[6]
self.OsType = val[7]
self.ArchType = val[8]
self.ImageType = val[9]
self.Compression = val[10]
self.Name = val[11].decode('utf-8').strip('\0')
if header_crc != self._crc():
raise Exception("Header: Uncorrect CRC of input data !")
return self.size
def get_img_type(data, offset=0):
""" Help function for extracting image fom raw data
:param data: The raw data as byte array
:param offset: The offset
:return: Image type and offset where image start
"""
while True:
if (offset + Header.SIZE) > len(data):
raise Exception("Error: Not an U-Boot image !")
(header_mn, header_crc,) = unpack_from('!2L', data, offset)
# Check the magic number if is U-Boot image
if header_mn == 0x27051956:
header = bytearray(data[offset:offset+Header.SIZE])
header[4:8] = [0]*4
if header_crc == CRC32(header):
break
offset += 4
(image_type,) = unpack_from('B', data, offset + 30)
return image_type, offset
def uhid_parse_event_from_kernel(event):
assert len(event) == 4380
ev_type = struct.unpack_from('< L', event)[0]
if ev_type == 2:
return struct.unpack_from(UHID_EVENT_FMT_START, event)
elif ev_type == 6:
return struct.unpack_from(UHID_EVENT_FMT_OUTPUT, event)
elif ev_type == 4:
return struct.unpack_from(UHID_EVENT_FMT_OPEN, event)
elif ev_type == 5:
return struct.unpack_from(UHID_EVENT_FMT_CLOSE, event)
elif ev_type == 3:
return struct.unpack_from(UHID_EVENT_FMT_STOP, event)
elif ev_type == 9:
return struct.unpack_from(UHID_EVENT_FMT_GETRPRT, event)
elif ev_type == 13:
return struct.unpack_from(UHID_EVENT_FMT_SETRPRT, event)
else:
raise ValueError('unknown UHID event type from kernel %d' % ev_type)
def _find_firstheader(nsis_file):
firstheader_offset = 0
pos = 0
while True:
chunk = nsis_file.read(32768 if firstheader_offset else 512)
if len(chunk) < _firstheader_pack.size:
return None
if firstheader_offset == 0:
firstheader = FirstHeader._make(
_firstheader_pack.unpack_from(chunk))
firstheader.header_offset = pos
firstheader.data_offset = pos + _firstheader_pack.size
if firstheader.siginfo == FH_SIG and \
firstheader.magics == FH_MAGICS:
# NSIS header found.
return firstheader
pos += len(chunk)
def _extract_header(nsis_file, firstheader):
inflated_data, data_size = inflate_header(nsis_file, firstheader.data_offset)
header = Header._make(_header_pack.unpack_from(inflated_data))
firstheader.header = header
firstheader._raw_header = bytes(inflated_data)
firstheader._raw_header_c_size = data_size
# Parse the block headers.
block_headers = []
for i in range(BLOCKS_COUNT):
header_offset = i * _blockheader_pack.size
block_header = BlockHeader._make(_blockheader_pack.unpack_from(
header.raw_blocks[header_offset:]))
block_headers.append(block_header)
header.blocks = block_headers
# Parse the install types.
header.install_types = [
struct.unpack_from('<I', header.raw_install_types[i:])[0]
for i in range(0, len(header.raw_install_types), 4)]
return header
def _init(self):
self._t1, self._t2, self._t3, self._p1, \
self._p2, self._p3, self._p4, self._p5, \
self._p6, self._p7, self._p8, self._p9, \
_, self._h1 = self._i2c.read_register(self._address, 0x88,
fmt="<HhhHhhhhhhhhBB")
buf = self._i2c.read_register(self._address, 0xE1, amount=7)
self._h6 = struct.unpack_from("<b", buf, 6)[0]
self._h2, self._h3 = struct.unpack("<hB", buf[0:3])
self._h4 = (struct.unpack_from("<b", buf, 3)[0] << 4) | (buf[4] & 0xf)
self._h5 = (struct.unpack_from("<b", buf, 5)[0] << 4) | (buf[4] >> 4)
self._tfine = 0
self._i2c.write(self._address, [0xf4, 0x3f])
super()._init()
def init_aes(shared_secret, nonce):
""" Initialize AES instance
:param hex shared_secret: Shared Secret to use as encryption key
:param int nonce: Random nonce
:return: AES instance and checksum of the encryption key
:rtype: length 2 tuple
"""
" Seed "
ss = unhexlify(shared_secret)
n = struct.pack("<Q", int(nonce))
encryption_key = hashlib.sha512(n + ss).hexdigest()
" Check'sum' "
check = hashlib.sha256(unhexlify(encryption_key)).digest()
check = struct.unpack_from("<I", check[:4])[0]
" AES "
key = unhexlify(encryption_key[0:64])
iv = unhexlify(encryption_key[64:96])
return AES.new(key, AES.MODE_CBC, iv), check
def unpack(self, buffer, _pos):
"""
Unpacks data from a string buffer and sets class members
"""
_pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
self.TaskID = struct.unpack_from(">q", buffer, _pos)[0]
_pos += 8
self.OptionID = struct.unpack_from(">q", buffer, _pos)[0]
_pos += 8
self.AssignedVehicle = struct.unpack_from(">q", buffer, _pos)[0]
_pos += 8
self.TimeThreshold = struct.unpack_from(">q", buffer, _pos)[0]
_pos += 8
self.TimeTaskCompleted = struct.unpack_from(">q", buffer, _pos)[0]
_pos += 8
return _pos
def unpack(self, buffer, _pos):
"""
Unpacks data from a string buffer and sets class members
"""
_pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
self.EntityID = struct.unpack_from(">q", buffer, _pos)[0]
_pos += 8
_valid = struct.unpack_from("B", buffer, _pos )[0]
_pos += 1
if _valid:
_series = struct.unpack_from(">q", buffer, _pos)[0]
_pos += 8
_type = struct.unpack_from(">I", buffer, _pos)[0]
_pos += 4
_version = struct.unpack_from(">H", buffer, _pos)[0]
_pos += 2
from lmcp import LMCPFactory
self.PlanningPosition = LMCPFactory.LMCPFactory().createObject(_series, _version, _type )
_pos = self.PlanningPosition.unpack(buffer, _pos)
else:
self.PlanningPosition = None
self.PlanningHeading = struct.unpack_from(">f", buffer, _pos)[0]
_pos += 4
return _pos
def unpack(self, buffer, _pos):
"""
Unpacks data from a string buffer and sets class members
"""
_pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
self.TaskID = struct.unpack_from(">q", buffer, _pos)[0]
_pos += 8
boolChar = struct.unpack_from(">B", buffer, _pos)[0]
self.RestartCompletely = True if boolChar == 1 else False
_pos += 1
_valid = struct.unpack_from("B", buffer, _pos )[0]
_pos += 1
if _valid:
_series = struct.unpack_from(">q", buffer, _pos)[0]
_pos += 8
_type = struct.unpack_from(">I", buffer, _pos)[0]
_pos += 4
_version = struct.unpack_from(">H", buffer, _pos)[0]
_pos += 2
from lmcp import LMCPFactory
self.ReAssign = LMCPFactory.LMCPFactory().createObject(_series, _version, _type )
_pos = self.ReAssign.unpack(buffer, _pos)
else:
self.ReAssign = None
return _pos
def unpack(self, buffer, _pos):
"""
Unpacks data from a string buffer and sets class members
"""
_pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
_arraylen = struct.unpack_from(">H", buffer, _pos )[0]
_arraylen = struct.unpack_from(">H", buffer, _pos )[0]
self.Vehicles = [None] * _arraylen
_pos += 2
if _arraylen > 0:
self.Vehicles = struct.unpack_from(">" + `_arraylen` + "q", buffer, _pos )
_pos += 8 * _arraylen
_arraylen = struct.unpack_from(">H", buffer, _pos )[0]
_arraylen = struct.unpack_from(">H", buffer, _pos )[0]
self.CanceledTasks = [None] * _arraylen
_pos += 2
if _arraylen > 0:
self.CanceledTasks = struct.unpack_from(">" + `_arraylen` + "q", buffer, _pos )
_pos += 8 * _arraylen
return _pos
def unpack(self, buffer, _pos):
"""
Unpacks data from a string buffer and sets class members
"""
_pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
self.VehicleID = struct.unpack_from(">q", buffer, _pos)[0]
_pos += 8
self.IntialTaskID = struct.unpack_from(">q", buffer, _pos)[0]
_pos += 8
self.IntialTaskOption = struct.unpack_from(">q", buffer, _pos)[0]
_pos += 8
self.DestinationTaskID = struct.unpack_from(">q", buffer, _pos)[0]
_pos += 8
self.DestinationTaskOption = struct.unpack_from(">q", buffer, _pos)[0]
_pos += 8
self.TimeToGo = struct.unpack_from(">q", buffer, _pos)[0]
_pos += 8
return _pos