def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)
python类ObjectIdentifier()的实例源码
def ssh_gss_oids(self, mode="client"):
"""
This method returns a single OID, because we only support the
Kerberos V5 mechanism.
:param str mode: Client for client mode and server for server mode
:return: A byte sequence containing the number of supported
OIDs, the length of the OID and the actual OID encoded with
DER
:rtype: Bytes
:note: In server mode we just return the OID length and the DER encoded
OID.
"""
OIDs = self._make_uint32(1)
krb5_OID = encoder.encode(ObjectIdentifier(self._krb5_mech))
OID_len = self._make_uint32(len(krb5_OID))
if mode == "server":
return OID_len + krb5_OID
return OIDs + OID_len + krb5_OID
def oidToMibName(mibView, oid):
if not isinstance(oid, tuple):
oid = tuple(univ.ObjectIdentifier(oid))
_oid, label, suffix = mibView.getNodeNameByOid(oid)
modName, symName, __suffix = mibView.getNodeLocation(_oid)
mibNode, = mibView.mibBuilder.importSymbols(
modName, symName
)
if hasattr(mibNode, 'createTest'): # table column
__modName, __symName, __s = mibView.getNodeLocation(_oid[:-1])
rowNode, = mibView.mibBuilder.importSymbols(__modName, __symName)
return (symName, modName), rowNode.getIndicesFromInstId(suffix)
elif not suffix: # scalar
return (symName, modName), suffix
elif suffix == (0,): # scalar
return (symName, modName), __scalarSuffix
else:
raise NoSuchObjectError(
str='No MIB registered that defines %s object, closest known parent is %s (%s::%s)' % (univ.ObjectIdentifier(oid), univ.ObjectIdentifier(mibNode.name), modName, symName)
)
# Value
def is_letsencrypt_cert(ee_cert):
'''Return True if ee_cert was issued by Let's Encrypt.
Args:
ee_cert (EndEntityCert)
'''
organization_name_oid = ObjectIdentifier(value='2.5.4.10')
issuer = ee_cert.tbscert.pyasn1['issuer']
if issuer:
for rdn in issuer['rdnSequence']:
for item in rdn:
if item.getComponentByName('type') == organization_name_oid:
organisation_name = str(item.getComponentByName('value'))
organisation_name = string_without_prefix('\x13\r',
organisation_name)
if organisation_name == "Let's Encrypt":
return True
return False
def tbscert_without_ct_extensions(tbscert):
'''Return pyasn1_modules.rfc5280.TBSCertificate instance `cert_pyasn1`
without sctlist extension (OID 1.3.6.1.4.1.11129.2.4.3) and
poison extension (OID 1.3.6.1.4.1.11129.2.4.2), if any.
'''
sctlist_oid = ObjectIdentifier(value='1.3.6.1.4.1.11129.2.4.2')
poison_oid = ObjectIdentifier(value='1.3.6.1.4.1.11129.2.4.3')
ct_oids = [sctlist_oid, poison_oid]
extensions = tbscert['extensions']
without_ct_extensions = extensions.subtype()
for extension in extensions:
if extension['extnID'] not in ct_oids:
without_ct_extensions.append(extension)
copy = copy_pyasn1_instance(tbscert)
copy['extensions'] = without_ct_extensions
return copy
def load_pkcs1_openssl_der(cls, keyfile):
"""Loads a PKCS#1 DER-encoded public key file from OpenSSL.
:param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
:return: a PublicKey object
:rtype: bytes
"""
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def load_pkcs1_openssl_der(cls, keyfile):
'''Loads a PKCS#1 DER-encoded public key file from OpenSSL.
@param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
@return: a PublicKey object
'''
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def load_pkcs1_openssl_der(cls, keyfile):
"""Loads a PKCS#1 DER-encoded public key file from OpenSSL.
:param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
:return: a PublicKey object
"""
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def ssh_gss_oids(self, mode="client"):
"""
This method returns a single OID, because we only support the
Kerberos V5 mechanism.
:param str mode: Client for client mode and server for server mode
:return: A byte sequence containing the number of supported
OIDs, the length of the OID and the actual OID encoded with
DER
:rtype: Bytes
:note: In server mode we just return the OID length and the DER encoded
OID.
"""
OIDs = self._make_uint32(1)
krb5_OID = encoder.encode(ObjectIdentifier(self._krb5_mech))
OID_len = self._make_uint32(len(krb5_OID))
if mode == "server":
return OID_len + krb5_OID
return OIDs + OID_len + krb5_OID
def set(self, *oidvalues):
assert self.alive is True
oidvalues_trans = []
for oid, value in oidvalues:
if isinstance(oid, tuple):
has_str = False
for entry in oid:
if isinstance(entry, str):
has_str = True
break
if has_str: # if oid is a tuple containing strings, assume translation using cmdgen.MibVariable.
# value must then be a Python type
assert isinstance(value, int) or isinstance(value, str) or isinstance(value, bool)
oidvalues_trans.append((cmdgen.MibVariable(*oid), value))
else:
# value must be a rfc1902/pyasn1 type
if not oid[-1] == 0:
assert isinstance(value, univ.Integer) or isinstance(value, univ.OctetString) or isinstance(value, univ.ObjectIdentifier)
oidvalues_trans.append((oid, value))
elif isinstance(oid, str): # if oid is a string, assume nodeid lookup
# value must then be a rfc1902/pyasn1 type, if oid is not a scalar
if not oid.endswith(".0"):
assert isinstance(value, univ.Integer) or isinstance(value, univ.OctetString) or isinstance(value, univ.ObjectIdentifier)
oidvalues_trans.append((nodeid(oid), value))
(error_indication, error_status, error_index, varbinds) = \
cmdgen.CommandGenerator().setCmd(self.auth, cmdgen.UdpTransportTarget((self.host, self.port), timeout=self.timeout, retries=self.retries), *oidvalues_trans) # pylint: disable=W0612
if error_indication or error_status:
self.__set_error(error_indication, error_status, error_index, varbinds)
raise SnmpError("SNMP set command on %s of oid values %r failed" % (self.host, oidvalues_trans), error_indication, error_status, error_index, varbinds)
return SnmpVarBinds(varbinds)
def __get_json(self, keytype=str):
json = {}
for key, value in list(self.get_dict().items()):
if isinstance(value, univ.OctetString):
value = str(value)
elif isinstance(value, univ.Integer):
value = int(value) # pylint:disable=R0204
elif isinstance(value, univ.ObjectIdentifier):
value = str(value)
else:
raise AssertionError("Unknown type %s encountered for oid %s" % (value.__class__.__name__, key))
json[keytype(key)] = value
return json
def _buildOid(*components):
output = []
for x in tuple(components):
if isinstance(x, univ.ObjectIdentifier):
output.extend(list(x))
else:
output.append(int(x))
return univ.ObjectIdentifier(output)
def _buildOid(*components):
output = []
for x in tuple(components):
if isinstance(x, univ.ObjectIdentifier):
output.extend(list(x))
else:
output.append(int(x))
return univ.ObjectIdentifier(output)
def _buildOid(*components):
output = []
for x in tuple(components):
if isinstance(x, univ.ObjectIdentifier):
output.extend(list(x))
else:
output.append(int(x))
return univ.ObjectIdentifier(output)
def _OID(*components):
output = []
for x in tuple(components):
if isinstance(x, univ.ObjectIdentifier):
output.extend(list(x))
else:
output.append(int(x))
return univ.ObjectIdentifier(output)
def _buildOid(*components):
output = []
for x in tuple(components):
if isinstance(x, univ.ObjectIdentifier):
output.extend(list(x))
else:
output.append(int(x))
return univ.ObjectIdentifier(output)
def _buildOid(*components):
output = []
for x in tuple(components):
if isinstance(x, univ.ObjectIdentifier):
output.extend(list(x))
else:
output.append(int(x))
return univ.ObjectIdentifier(output)
def _buildOid(*components):
output = []
for x in tuple(components):
if isinstance(x, univ.ObjectIdentifier):
output.extend(list(x))
else:
output.append(int(x))
return univ.ObjectIdentifier(output)
def OidFromAttid(prefixTable, attr):
# separate the ATTRTYP into two parts
upperWord = attr / 65536
lowerWord = attr % 65536
# search in the prefix table to find the upperWord, if found,
# construct the binary OID by appending lowerWord to the end of
# found prefix.
binaryOID = None
for j, item in enumerate(prefixTable):
if item['ndx'] == upperWord:
binaryOID = item['prefix']['elements'][:item['prefix']['length']]
if lowerWord < 128:
binaryOID.append(chr(lowerWord))
else:
if lowerWord >= 32768:
lowerWord -= 32768
binaryOID.append(chr(((lowerWord/128) % 128)+128))
binaryOID.append(chr(lowerWord%128))
break
if binaryOID is None:
return None
return str(decoder.decode('\x06' + chr(len(binaryOID)) + ''.join(binaryOID), asn1Spec = univ.ObjectIdentifier())[0])
def test_read_system_oid(self):
self.assertEqual(
univ.ObjectIdentifier(baytech_mrp27.sBTA),
self.pdu.oid_mapping[sysObjectID].value
)