python类unpack_from()的实例源码

TaskProgress.py 文件源码 项目:salty 作者: GaloisInc 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def unpack(self, buffer, _pos):
        """
        Unpacks data from a string buffer and sets class members
        """
        _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
        self.ResponseID = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        self.TaskID = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        self.PercentComplete = struct.unpack_from(">f", buffer, _pos)[0]
        _pos += 4
        _arraylen = struct.unpack_from(">H", buffer, _pos )[0]
        _arraylen = struct.unpack_from(">H", buffer, _pos )[0]
        self.EntitiesEngaged = [None] * _arraylen
        _pos += 2
        if _arraylen > 0:
            self.EntitiesEngaged = struct.unpack_from(">" + `_arraylen` + "q", buffer, _pos )
            _pos += 8 * _arraylen
        return _pos
AssignmentCoordination.py 文件源码 项目:salty 作者: GaloisInc 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def unpack(self, buffer, _pos):
        """
        Unpacks data from a string buffer and sets class members
        """
        _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
        self.CoordinatedAutomationRequestID = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        _valid = struct.unpack_from("B", buffer, _pos )[0]
        _pos += 1
        if _valid:
            _series = struct.unpack_from(">q", buffer, _pos)[0]
            _pos += 8
            _type = struct.unpack_from(">I", buffer, _pos)[0]
            _pos += 4
            _version = struct.unpack_from(">H", buffer, _pos)[0]
            _pos += 2
            from lmcp import LMCPFactory
            self.PlanningState = LMCPFactory.LMCPFactory().createObject(_series, _version, _type )
            _pos = self.PlanningState.unpack(buffer, _pos)
        else:
            self.PlanningState = None
        return _pos
TaskComplete.py 文件源码 项目:salty 作者: GaloisInc 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def unpack(self, buffer, _pos):
        """
        Unpacks data from a string buffer and sets class members
        """
        _pos = LMCPObject.LMCPObject.unpack(self, buffer, _pos)
        self.TaskID = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        _arraylen = struct.unpack_from(">H", buffer, _pos )[0]
        _arraylen = struct.unpack_from(">H", buffer, _pos )[0]
        self.EntitiesInvolved = [None] * _arraylen
        _pos += 2
        if _arraylen > 0:
            self.EntitiesInvolved = struct.unpack_from(">" + `_arraylen` + "q", buffer, _pos )
            _pos += 8 * _arraylen
        self.TimeTaskCompleted = struct.unpack_from(">q", buffer, _pos)[0]
        _pos += 8
        return _pos
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def calc_chksums(buf):
    """Calculate the checksum for a member's header by summing up all
       characters except for the chksum field which is treated as if
       it was filled with spaces. According to the GNU tar sources,
       some tars (Sun and NeXT) calculate chksum with signed char,
       which will be different if there are chars in the buffer with
       the high bit set. So we calculate two checksums, unsigned and
       signed.
    """
    unsigned_chksum = 256 + sum(struct.unpack_from("148B8x356B", buf))
    signed_chksum = 256 + sum(struct.unpack_from("148b8x356b", buf))
    return unsigned_chksum, signed_chksum
remote.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def listen(self):
        """
        Listen to socket.
        """
        try:
            while True:
                head = await self.reader.readexactly(8)
                size, handle = struct.unpack_from('<LL', head)
                body = await self.reader.readexactly(size)
                data = method = fault = None

                try:
                    data, method = loads(body, use_builtin_types=True)
                except Fault as e:
                    fault = e
                except ExpatError as e:
                    # See #121 for this solution.
                    handle_exception(exception=e, module_name=__name__, func_name='listen', extra_data={'body': body})
                    continue

                if data and len(data) == 1:
                    data = data[0]

                self.event_loop.create_task(self.handle_payload(handle, method, data, fault))
        except ConnectionResetError as e:
            logger.critical(
                'Connection with the dedicated server has been closed, we will now close down the subprocess! {}'.format(str(e))
            )
            # When the connection has been reset, we will close the controller process so it can be restarted by the god
            # process. Exit code 10 gives the information to the god process.
            exit(10)
        except Exception as e:
            handle_exception(exception=e, module_name=__name__, func_name='listen')
            raise
_mpdu.py 文件源码 项目:itamae 作者: wraith-wireless 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _iesubelfte_(s,sid):
    """ :returns: parsed opt subelement of FTE element """
    ret = s
    if sid == std.EID_FTE_RSRV: pass
    elif sid == std.EID_FTE_PMK_R1:
        # a 6-octed key
        ret = {'r1kh-id':struct.unpack_from('=Q',s+'\x00\x00')[0]}
    elif sid == std.EID_FTE_GTK:
        # Std Fig. 8-237 Key Info|Key Len|RSC|Wrapped Key
        #                       2|      1|  8|      24-40
        ki,kl,r = struct.unpack_from('=HBQ',s)
        ret = {'key-info':{'key-id':bits.leastx(2,ki),
                           'rsrv':bits.mostx(2,ki)},
               'key-leng':kl,
               'rsc':r,
               'wrapped-key':binascii.hexlify(s[struct.calcsize('=HBQ'):])}
    elif sid == std.EID_FTE_PMK_R0:
        # variable length 1-48 octets
        ret = {'r0kh-id':binascii.hexlify(s)}
    elif sid == std.EID_FTE_IGTK:
        # Std Fig 8-239 Key ID|IPN|Key Length|Wrapped Key
        #                    2|  6|         1|         24
        ki = struct.unpack_from('=H',s)[0]
        ipn = struct.unpack_from('=Q',s[2:8]+'\x00\x00')[0]
        kl = struct.unpack_from('=B',s,8)[0]
        ret = {'key-id':ki,
               'ipn':ipn,
               'key-len':kl,
               'wrapped-key':binascii.hexlify(s[9:])}
    return ret

# Diagnositc Report/Request optional subelements Std Table 8-143 & figures commented below
_mpdu.py 文件源码 项目:itamae 作者: wraith-wireless 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _iesubelfmsreq_(s,sid):
    """ :returns: parsed fms request subelement """
    ret = s
    if sid == std.EID_FMS_REQ_SUBELEMENT_FMS: # Std Fig. 8-327
        # Note: the 4-byte rate identification is defined in 8.4.1.32
        # as 1|1|2
        di,mi,m,i,r = struct.unpack_from('=4BH',s)
        rem = s[6:]
        ret = {'delv-intv':di,
               'max-delv-intv':mi,
               'rate-ident':{'mask':_rateidmask_(m),
                             'mcs-index':i,
                             'rate':r}}

        # there are one or more tclas elements folled by an option tclas
        # processing element
        while rem:
            eid,tlen = struct.unpack_from('=2B',rem)
            if eid == std.EID_TCLAS:
                if not 'tclas' in ret: ret['tclas'] = []
                ret['tclas'].append(_parseie_(std.EID_TCLAS,rem[:tlen]))
                ret = ret[2+tlen:]
            elif eid == std.EID_TCLAS_PRO:
                ret['tclas-pro'] = _parseie_(std.EID_TCLAS_PRO,ret)
                # could use a break here but want to make sure
                # there are not hanging elements
                ret = ret[3:]
    elif sid == std.EID_FMS_REQ_SUBELEMENT_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# FMS Response subelements Std Table 8-159 & figures commented below
_mpdu.py 文件源码 项目:itamae 作者: wraith-wireless 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _iesubelmsmtreqcl_(s,sid):
    """ :returns: parsed subelement of type channel load in msmt request """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_CL_RPT:
        # Std fig. 8-110
        c,r = struct.unpack_from('=2B',s)
        ret = {'rpt-condition':c,'ch-load-ref-val':r}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_CL_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# MSMT Request subelements for type Noise Histogram Std Table 8-62 and figures below
_mpdu.py 文件源码 项目:itamae 作者: wraith-wireless 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _iesubelmsmtreqnh_(s,sid):
    """ :returns: parsed subelement of type channel load in msmt request """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_NH_RPT:
        # Std fig. 8-112
        c,a = struct.unpack_from('=2B',s)
        ret = {'rpt-condition':c,'anpi-ref-val':a}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_NH_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# MSMT Request subelements for type Beaon Std Table 8-65 and figures below
_mpdu.py 文件源码 项目:itamae 作者: wraith-wireless 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _iesubelmsmtreqbeacon_(s,sid):
    """ :returns: parsed subelement of type beacon in msmt request """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_SSID:
        ret = {'ssid':_iesubelssid_(s)}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_BRI:
        # Std Fig. 8-114
        r = struct.unpack_from('=B',s)[0]
        if 5 <= r <= 10: t = int2s(s[1])
        else: t = struct.unpack_from('=B',s,1)[0]
        ret = {'rpt-condition':r,
               'threshold':t}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_RPT:
        # Std Table 8-67
        ret = {'rpt-detail':struct.unpack_from('=B',s)}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_REQ:
        # same as Std 8.4.2.13
        ret = _parseie_(std.EID_REQUEST,s)
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_AP_CH_RPT:
        # same as Std 8.4.2.38
        ret = _parseie_(std.EID_AP_CH_RPT,s)
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_BEACON_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

#### NOTE: next three could probably be combined

# MSMT Request subelements for type Frame Request Std Table 8-68 and figures below
_mpdu.py 文件源码 项目:itamae 作者: wraith-wireless 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _iesubelmsmtreqlci_(s,sid):
    """ :returns: parsed lci optional subfield """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_LCI_AZIMUTH: # std Fig. 8-124
        ret = {'azimuth-req':_eidmsmtreqlciazimuth_(struct.unpack_from('=B',s)[0])}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_LCI_REQUESTING:
        ret = {'originator-mac':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_LCI_TARGET:
        ret = {'target-mac':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_LCI_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# LCI AZIMUTH REQUEST AZIMUTH REQUST FIELD Std Fig. 8-125
_mpdu.py 文件源码 项目:itamae 作者: wraith-wireless 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _iesubelmsmtreqmcastdiag_(s,sid):
    """ :returns: parsed subelement of type mcast diag """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_MCAST_TRIGGER:
        c,t,d = struct.unpack_from('=3B',s)
        ret = {'mcast-trigger-rpt':{'trigger-condition': c,
                                    'inactivity-timeout': t,
                                    'reactivation-delay': d}}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_MCAST_VEND:
        ret =  _parseie_(std.EID_VEND_SPEC,s)
    return ret

# MSMT Request subelements for type Location civic request Std Table 8-79
_mpdu.py 文件源码 项目:itamae 作者: wraith-wireless 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _iesubelmsmtreqloccivic_(s,sid):
    """ :returns: parsed subelement of type location civic in msmt request """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_CIVIC_ORIGIN:
        ret = {'originator':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_CIVIC_TARGET:
        ret = {'target':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_CIVIC_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# MSMT Request subelements for Type Location Id Std Table 8-80
_mpdu.py 文件源码 项目:itamae 作者: wraith-wireless 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _iesubelmsmtrptsta_(s,sid):
    """ :returns: parsed STA optional subelement """
    ret = s
    if sid == std.EID_MSMT_RPT_STA_STAT_REASON:
        ret = {'reason':struct.unpack_from('=B',s)[0]}
    elif sid == std.EID_MSMT_RPT_STA_STAT_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# MSMT Report subelements for Type LCI Std Table 8-90
_mpdu.py 文件源码 项目:itamae 作者: wraith-wireless 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _iesubelmsmtrptlocid_(s,sid):
    """ :returns: parsed subelement of type location civic in msmt request """
    ret = s
    if sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_ID_ORIGIN:
        ret = {'originator':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_ID_TARGET:
        ret = {'target':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EID_MSMT_REQ_SUBELEMENT_LOC_ID_VEND:
        ret = _parseie_(std.EID_VEND_SPEC,s)
    return ret

# EVENT REQUEST sublements for Type transition Std 8.4.2.69.2
_mpdu.py 文件源码 项目:itamae 作者: wraith-wireless 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _iesubelevreqp2p_(s,sid):
    """ :returns: parsed sublements of type P2P link in event request """
    ret = s
    if sid == std.EVENT_REQUEST_TYPE_P2P_PEER:
        ret = {'peer-addr':_hwaddr_(struct.unpack_from('=6B',s))}
    elif sid == std.EVENT_REQUEST_TYPE_P2P_CH_NUM:
        # TODO: make this a single function -> it appears multiple times
        o,c = struct.unpack_from('=2B',s)
        ret = {'op-class':o,'ch-num':c}
    return ret

# EVENT REQUEST sublements for Type Vend Std 8.4.2.69.5
_mpdu.py 文件源码 项目:itamae 作者: wraith-wireless 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _parselcirpt_(s):
    """ :returns: parsed lci location elements """
    lci = {}
    for n,i,l,x,f in _EID_MSMT_RPT_LCI_FIELDS_:
        if n == 'lat-int' or n == 'lon-int' or n == 'alt-int':
            # these are 2's complement
            lci[n] = int2s(s[i:i+1]+'\x00'*x)
        else:
            lci[n] = struct.unpack_from(f,s[i:i+l]+'\x00'*x)
    return lci

# MSMT Report->MCast Diagn report reason field Std Fig. 8-188
_mpdu.py 文件源码 项目:itamae 作者: wraith-wireless 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _parsesuitesel_(s):
    """ :returns: parse suite selector from packed string s """
    vs = struct.unpack_from('=4B',s)
    return {'oui':_hwaddr_(vs[0:3]).replace(':','-'),'suite-type':vs[-1]}

# RSN capabilities of the RSNE Std Fig 8-188
_mpdu.py 文件源码 项目:itamae 作者: wraith-wireless 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _parsetimeval_(s):
    """ :returns: a parsed time value from packed string s """
    tval = struct.unpack_from('=H5BHB',s)
    return {'year':tval[0],
            'month':tval[1],
            'day':tval[2],
            'hours':tval[3],
            'minutes':tval[4],
            'seconds':tval[5],
            'milliseconds':tval[6],
            'rsrv':tval[7]}

################################################################################
#### CTRL Frames Std 8.3.1
################################################################################
_mpdu.py 文件源码 项目:itamae 作者: wraith-wireless 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _tkip_(f,m):
    """
     parse tkip data from frame f into mac dict
     :param f: frame
     :param m: mpdu dict
    """
    try:
        keyid = struct.unpack_from('=B',f,m['offset']+_TKIP_KEY_BYTE_)[0]
        m['l3-crypt'] = {'type':'tkip',
                         'iv':{'tsc1':f[m['offset']+_TKIP_TSC1_BYTE_],
                               'wep-seed':f[m['offset']+_TKIP_WEPSEED_BYTE_],
                               'tsc0':f[m['offset']+_TKIP_TSC0_BYTE_],
                               'key-id':{'rsrv':bits.leastx(_TKIP_EXT_IV_,keyid),
                                         'ext-iv':bits.midx(_TKIP_EXT_IV_,_TKIP_EXT_IV_LEN_,keyid),
                                         'key-id':bits.mostx(_TKIP_EXT_IV_+_TKIP_EXT_IV_LEN_,keyid)}},
                         'ext-iv':{'tsc2':f[m['offset']+_TKIP_TSC2_BYTE_],
                                   'tsc3':f[m['offset']+_TKIP_TSC3_BYTE_],
                                   'tsc4':f[m['offset']+_TKIP_TSC4_BYTE_],
                                   'tsc5':f[m['offset']+_TKIP_TSC5_BYTE_]},
                         'mic':f[-(_TKIP_MIC_LEN_ + _TKIP_ICV_LEN_):-_TKIP_ICV_LEN_],
                         'icv':f[-_TKIP_ICV_LEN_:]}
        m['offset'] += _TKIP_IV_LEN_
        m['stripped'] += _TKIP_MIC_LEN_ + _TKIP_ICV_LEN_
    except Exception as e:
        m['err'].append(('l3-crypt.tkip',"parsing {0}".format(e)))

#### CCMP Std 11.4.3.2
# <MAC HDR>|CCMP HDR|DATA|MIC|FCS
# bytes var|       8| >=1|  8|  4
# where the CCMP Header is defined
#    PN0|PN1|RSRV|RSRV|EXT IV|KeyID|PN2|PN3|PN4|PN5
# bits 8|  8|   8|   5|     1|    2|  8|  8|  8|  8


问题


面经


文章

微信
公众号

扫码关注公众号