python类decode()的实例源码

getnext-pull-whole-mib.py 文件源码 项目:pysnmp 作者: etingof 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def cbRecvFun(transportDispatcher, transportDomain, transportAddress,
              wholeMsg, reqPDU=reqPDU, headVars=headVars):
    while wholeMsg:
        rspMsg, wholeMsg = decoder.decode(wholeMsg, asn1Spec=pMod.Message())
        rspPDU = pMod.apiMessage.getPDU(rspMsg)
        # Match response to request
        if pMod.apiPDU.getRequestID(reqPDU) == pMod.apiPDU.getRequestID(rspPDU):
            # Check for SNMP errors reported
            errorStatus = pMod.apiPDU.getErrorStatus(rspPDU)
            if errorStatus and errorStatus != 2:
                raise Exception(errorStatus)
            # Format var-binds table
            varBindTable = pMod.apiPDU.getVarBindTable(reqPDU, rspPDU)
            # Report SNMP table
            for tableRow in varBindTable:
                for name, val in tableRow:
                    print('from: %s, %s = %s' % (
                        transportAddress, name.prettyPrint(), val.prettyPrint()
                    )
                          )
            # Stop on EOM
            for oid, val in varBindTable[-1]:
                if not isinstance(val, pMod.Null):
                    break
            else:
                transportDispatcher.jobFinished(1)

            # Generate request for next row
            pMod.apiPDU.setVarBinds(
                reqPDU, [(x, pMod.null) for x, y in varBindTable[-1]]
            )
            pMod.apiPDU.setRequestID(reqPDU, pMod.getNextRequestID())
            transportDispatcher.sendMessage(
                encoder.encode(reqMsg), transportDomain, transportAddress
            )
            global startedAt
            if time() - startedAt > 3:
                raise Exception('Request timed out')
            startedAt = time()
    return wholeMsg
listen-on-ipv4-and-ipv6-interfaces.py 文件源码 项目:pysnmp 作者: etingof 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg):
    while wholeMsg:
        msgVer = int(api.decodeMessageVersion(wholeMsg))
        if msgVer in api.protoModules:
            pMod = api.protoModules[msgVer]
        else:
            print('Unsupported SNMP version %s' % msgVer)
            return
        reqMsg, wholeMsg = decoder.decode(
            wholeMsg, asn1Spec=pMod.Message(),
        )
        print('Notification message from %s:%s: ' % (
            transportDomain, transportAddress
        )
              )
        reqPDU = pMod.apiMessage.getPDU(reqMsg)
        if reqPDU.isSameTypeWith(pMod.TrapPDU()):
            if msgVer == api.protoVersion1:
                print('Enterprise: %s' % (pMod.apiTrapPDU.getEnterprise(reqPDU).prettyPrint()))
                print('Agent Address: %s' % (pMod.apiTrapPDU.getAgentAddr(reqPDU).prettyPrint()))
                print('Generic Trap: %s' % (pMod.apiTrapPDU.getGenericTrap(reqPDU).prettyPrint()))
                print('Specific Trap: %s' % (pMod.apiTrapPDU.getSpecificTrap(reqPDU).prettyPrint()))
                print('Uptime: %s' % (pMod.apiTrapPDU.getTimeStamp(reqPDU).prettyPrint()))
                varBinds = pMod.apiTrapPDU.getVarBinds(reqPDU)
            else:
                varBinds = pMod.apiPDU.getVarBinds(reqPDU)
            print('Var-binds:')
            for oid, val in varBinds:
                print('%s = %s' % (oid.prettyPrint(), val.prettyPrint()))
    return wholeMsg
verdec.py 文件源码 项目:pysnmp 作者: etingof 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def decodeMessageVersion(wholeMsg):
    try:
        seq, wholeMsg = decoder.decode(
            wholeMsg, asn1Spec=univ.Sequence(),
            recursiveFlag=False, substrateFun=lambda a, b, c: (a, b[:c])
        )
        ver, wholeMsg = decoder.decode(
            wholeMsg, asn1Spec=univ.Integer(),
            recursiveFlag=False, substrateFun=lambda a, b, c: (a, b[:c])
        )
        if eoo.endOfOctets.isSameTypeWith(ver):
            raise ProtocolError('EOO at SNMP version component')
        return ver
    except PyAsn1Error:
        raise ProtocolError('Invalid BER at SNMP version component')
drsuapi.py 文件源码 项目:CVE-2017-7494 作者: joxeankoret 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def OidFromAttid(prefixTable, attr):
    # separate the ATTRTYP into two parts
    upperWord = attr / 65536
    lowerWord = attr % 65536

    # search in the prefix table to find the upperWord, if found,
    # construct the binary OID by appending lowerWord to the end of
    # found prefix.

    binaryOID = None
    for j, item in enumerate(prefixTable):
        if item['ndx'] == upperWord:
            binaryOID = item['prefix']['elements'][:item['prefix']['length']]
            if lowerWord < 128:
                binaryOID.append(chr(lowerWord))
            else:
                if lowerWord >= 32768:
                    lowerWord -= 32768
                binaryOID.append(chr(((lowerWord/128) % 128)+128))
                binaryOID.append(chr(lowerWord%128))
            break

    if binaryOID is None:
        return None

    return str(decoder.decode('\x06' + chr(len(binaryOID)) + ''.join(binaryOID), asn1Spec = univ.ObjectIdentifier())[0])
ldapasn1.py 文件源码 项目:CVE-2017-7494 作者: joxeankoret 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def decodeControlValue(self):
        decodedControlValue, _ = decoder.decode(self['controlValue'], asn1Spec=SimplePagedResultsControlValue())
        self._size, self._cookie = decodedControlValue[0], decodedControlValue[1]
        return decodedControlValue
ldap.py 文件源码 项目:CVE-2017-7494 作者: joxeankoret 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def recv(self):
        REQUEST_SIZE = 8192
        data = ''
        done = False
        while not done:
            recvData = self._socket.recv(REQUEST_SIZE)
            if len(recvData) < REQUEST_SIZE:
                done = True
            data += recvData

        response = []
        while len(data) > 0:
            try:
                message, remaining = decoder.decode(data, asn1Spec=LDAPMessage())
            except SubstrateUnderrunError:
                # We need more data
                remaining = data + self._socket.recv(REQUEST_SIZE)
            else:
                if message['messageID'] == 0:  # unsolicited notification
                    name = message['protocolOp']['extendedResp']['responseName'] or message['responseName']
                    notification = KNOWN_NOTIFICATIONS.get(name, "Unsolicited Notification '%s'" % name)
                    if name == NOTIFICATION_DISCONNECT:  # Server has disconnected
                        self.close()
                    raise LDAPSessionError(
                        error=int(message['protocolOp']['extendedResp']['resultCode']),
                        errorString='%s -> %s: %s' % (notification,
                                                      message['protocolOp']['extendedResp']['resultCode'].prettyPrint(),
                                                      message['protocolOp']['extendedResp']['diagnosticMessage'])
                    )
                response.append(message)
            data = remaining

        self._messageId += 1
        return response
krb5-downgrade-asreq.py 文件源码 项目:des_kpt 作者: h1kari 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def main(argv):
    try:
        infile  = argv[0]
    except:
        print "usage: ./krb5-downgrade-asreq.py <infile>"
        sys.exit(0)

    fin  = open(infile, 'r')

    data = fin.read()
    data_len = len(data)
    fin.close()

    krb_preauth_req, temp = decoder.decode(data[4:])

    for i in range(0, len(krb_preauth_req[3][7])):
        krb_preauth_req[3][7][i] = univ.Integer(1)

    payload_out = data[:4]
    payload_out += encoder.encode(krb_preauth_req)

    # log what we're doing
    fout = open(infile +".in", "w")
    fout.write(data)
    fout.close()

    fout = open(infile +".out", "w")
    fout.write(payload_out)
    fout.close()

    sys.stdout.write(payload_out)
    os.remove(infile)
service_account.py 文件源码 项目:ecodash 作者: Servir-Mekong 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _get_private_key(private_key_pkcs8_text):
  """Get an RSA private key object from a pkcs8 representation."""

  der = rsa.pem.load_pem(private_key_pkcs8_text, 'PRIVATE KEY')
  asn1_private_key, _ = decoder.decode(der, asn1Spec=PrivateKeyInfo())
  return rsa.PrivateKey.load_pkcs1(
      asn1_private_key.getComponentByName('privateKey').asOctets(),
      format='DER')
service_account.py 文件源码 项目:OneClickDTU 作者: satwikkansal 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _get_private_key(private_key_pkcs8_text):
    """Get an RSA private key object from a pkcs8 representation."""
    private_key_pkcs8_text = _to_bytes(private_key_pkcs8_text)
    der = rsa.pem.load_pem(private_key_pkcs8_text, 'PRIVATE KEY')
    asn1_private_key, _ = decoder.decode(der, asn1Spec=PrivateKeyInfo())
    return rsa.PrivateKey.load_pkcs1(
        asn1_private_key.getComponentByName('privateKey').asOctets(),
        format='DER')
service_account.py 文件源码 项目:aqua-monitor 作者: Deltares 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_private_key(private_key_pkcs8_text):
    """Get an RSA private key object from a pkcs8 representation."""
    private_key_pkcs8_text = _to_bytes(private_key_pkcs8_text)
    der = rsa.pem.load_pem(private_key_pkcs8_text, 'PRIVATE KEY')
    asn1_private_key, _ = decoder.decode(der, asn1Spec=PrivateKeyInfo())
    return rsa.PrivateKey.load_pkcs1(
        asn1_private_key.getComponentByName('privateKey').asOctets(),
        format='DER')
verdec.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def decodeMessageVersion(wholeMsg):
    try:
        seq, wholeMsg = decoder.decode(
            wholeMsg, asn1Spec=univ.Sequence(), recursiveFlag=0
        )
        ver, wholeMsg = decoder.decode(
            wholeMsg, asn1Spec=univ.Integer(), recursiveFlag=0
        )
        if eoo.endOfOctets.isSameTypeWith(ver):
            raise ProtocolError('EOO at SNMP version component')
        return ver
    except PyAsn1Error:
        raise ProtocolError('Invalid BER at SNMP version component')
deref.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def decodeControlValue(self,encodedControlValue):
    decodedValue,_ = decoder.decode(encodedControlValue,asn1Spec=DerefResultControlValue())
    self.derefRes = {}
    for deref_res in decodedValue:
      deref_attr,deref_val,deref_vals = deref_res
      partial_attrs_dict = dict([
        (str(t),map(str,v))
        for t,v in deref_vals or []
      ])
      try:
        self.derefRes[str(deref_attr)].append((str(deref_val),partial_attrs_dict))
      except KeyError:
        self.derefRes[str(deref_attr)] = [(str(deref_val),partial_attrs_dict)]
readentry.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def decodeControlValue(self,encodedControlValue):
    decodedEntry,_ = decoder.decode(encodedControlValue,asn1Spec=SearchResultEntry())
    self.dn = str(decodedEntry[0])
    self.entry = {}
    for attr in decodedEntry[1]:
      self.entry[str(attr[0])] = [ str(attr_value) for attr_value in attr[1] ]
syncrepl.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def decodeControlValue(self, encodedControlValue):
        d = decoder.decode(encodedControlValue, asn1Spec = syncStateValue())
        state = d[0].getComponentByName('state')
        uuid = UUID(bytes=d[0].getComponentByName('entryUUID'))
        self.cookie = d[0].getComponentByName('cookie')
        self.state = self.__class__.opnames[int(state)]
        self.entryUUID = str(uuid)
        if self.cookie is not None:
            self.cookie = str(self.cookie)
syncrepl.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def decodeControlValue(self, encodedControlValue):
        d = decoder.decode(encodedControlValue, asn1Spec = syncDoneValue())
        self.cookie = d[0].getComponentByName('cookie')
        self.refreshDeletes = d[0].getComponentByName('refreshDeletes')
        if self.cookie is not None:
            self.cookie = str(self.cookie)
        if self.refreshDeletes is not None:
            self.refreshDeletes = bool(self.refreshDeletes)
syncrepl.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, encodedMessage):
        d = decoder.decode(encodedMessage, asn1Spec = syncInfoValue())
        self.newcookie = None
        self.refreshDelete = None
        self.refreshPresent = None
        self.syncIdSet = None

        for attr in [ 'newcookie', 'refreshDelete', 'refreshPresent', 'syncIdSet']:
            comp = d[0].getComponentByName(attr)

            if comp is not None:

                if attr == 'newcookie':
                    self.newcookie = str(comp)
                    return

                val = dict()

                cookie = comp.getComponentByName('cookie')
                if cookie is not None:
                    val['cookie'] = str(cookie)

                if attr.startswith('refresh'):
                    val['refreshDone'] = bool(comp.getComponentByName('refreshDone'))
                elif attr == 'syncIdSet':
                    uuids = []
                    ids = comp.getComponentByName('syncUUIDs')
                    for i in range(len(ids)):
                        uuid = UUID(bytes=str(ids.getComponentByPosition(i)))
                        uuids.append(str(uuid))
                    val['syncUUIDs'] = uuids
                    val['refreshDeletes'] = bool(comp.getComponentByName('refreshDeletes'))

                setattr(self,attr,val)
                return
drsuapi.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def OidFromAttid(prefixTable, attr):
    # separate the ATTRTYP into two parts
    upperWord = attr / 65536
    lowerWord = attr % 65536

    # search in the prefix table to find the upperWord, if found,
    # construct the binary OID by appending lowerWord to the end of
    # found prefix.

    binaryOID = None
    for j, item in enumerate(prefixTable):
        if item['ndx'] == upperWord:
            binaryOID = item['prefix']['elements'][:item['prefix']['length']]
            if lowerWord < 128:
                binaryOID.append(chr(lowerWord))
            else:
                if lowerWord >= 32768:
                    lowerWord -= 32768
                binaryOID.append(chr(((lowerWord/128) % 128)+128))
                binaryOID.append(chr(lowerWord%128))
            break

    if binaryOID is None:
        return None

    return str(decoder.decode('\x06' + chr(len(binaryOID)) + ''.join(binaryOID), asn1Spec = univ.ObjectIdentifier())[0])
service_account.py 文件源码 项目:SurfaceWaterTool 作者: Servir-Mekong 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_private_key(private_key_pkcs8_text):
  """Get an RSA private key object from a pkcs8 representation."""

  der = rsa.pem.load_pem(private_key_pkcs8_text, 'PRIVATE KEY')
  asn1_private_key, _ = decoder.decode(der, asn1Spec=PrivateKeyInfo())
  return rsa.PrivateKey.load_pkcs1(
      asn1_private_key.getComponentByName('privateKey').asOctets(),
      format='DER')
pki.py 文件源码 项目:certbot-asa 作者: chrismarget 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def prune_not_ca(self):
        from pyasn1.codec.ber import decoder as d
        for i in list(reversed(range(len(self.certs)))):
            for e in range(self.certs[i].get_extension_count()):
                if self.certs[i].get_extension(e).get_short_name() == 'basicConstraints':
                    data = d.decode(self.certs[i].get_extension(e).get_data())[0]
                    ca = False
                    if data:
                        ca = data.getComponentByPosition(0)
                    if not ca:
                        self.certs.pop(i)
                        return True
        return False
pki.py 文件源码 项目:certbot-asa 作者: chrismarget 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_server_cert(self):
        from pyasn1.codec.ber import decoder as d
        for i in range(len(self.certs)):
            for e in range(self.certs[i].get_extension_count()):
                if self.certs[i].get_extension(e).get_short_name() == 'basicConstraints':
                    data = d.decode(self.certs[i].get_extension(e).get_data())[0]
                    ca = False
                    if data:
                        ca = data.getComponentByPosition(0)
                    if not ca:
                        return self.certs[i]


问题


面经


文章

微信
公众号

扫码关注公众号