def load_pkcs1_openssl_der(cls, keyfile):
'''Loads a PKCS#1 DER-encoded public key file from OpenSSL.
@param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
@return: a PublicKey object
'''
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
python类ObjectIdentifier()的实例源码
def _snmpFormatValue(v):
_type=type(v)
if _type is NoSuchObject:
res="?????? ???????? OID"
elif sum([issubclass(_type,baseType) for baseType in [TimeTicks,Integer,Integer32,Gauge32,Counter32,Counter64,Unsigned32]]):
res=int(v)
elif sum([issubclass(_type,baseType) for baseType in [ObjectIdentity,ObjectIdentifier,IpAddress]]):
res=str(v.prettyPrint())
elif sum([issubclass(_type,baseType) for baseType in [OctetString]]):
pp=v.prettyPrint()
s=str(v)
# sometimes prettyPrint returns hex values instead of string because of trailing 00 byte
# if we got not a string - try to remove trailing 00
# if we got a string after it - use this value
if pp!=s:
if len(v)>0:
if v[len(v)-1]==0:
v2=v[:len(v)-1]
pp2=v2.prettyPrint()
s2=str(v2)
if pp2==s2:
v=v2
res=v.prettyPrint()
else:
res="??????????? ??? ?????? "+str(_type)
# if issubclass(_type,OctetString):
# print("v=",v)
# # print("res=",res)
# print("pp=",v.prettyPrint())
# print("bytes=",v.asOctets())
# print("bytes-decode=",v.asOctets().decode('iso-8859-1'))
# print("str=",str(v))
# print("hex=",binascii.hexlify(v.asOctets()).decode("utf8"))
return res
def load_pkcs1_openssl_der(cls, keyfile):
"""Loads a PKCS#1 DER-encoded public key file from OpenSSL.
:param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
:return: a PublicKey object
"""
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def load_pkcs1_openssl_der(cls, keyfile):
'''Loads a PKCS#1 DER-encoded public key file from OpenSSL.
@param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
@return: a PublicKey object
'''
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def load_pkcs1_openssl_der(cls, keyfile):
"""Loads a PKCS#1 DER-encoded public key file from OpenSSL.
:param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
:return: a PublicKey object
"""
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def load_pkcs1_openssl_der(cls, keyfile):
"""Loads a PKCS#1 DER-encoded public key file from OpenSSL.
:param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
:return: a PublicKey object
"""
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def load_pkcs1_openssl_der(cls, keyfile):
"""Loads a PKCS#1 DER-encoded public key file from OpenSSL.
:param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
:return: a PublicKey object
"""
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def load_pkcs1_openssl_der(cls, keyfile):
"""Loads a PKCS#1 DER-encoded public key file from OpenSSL.
:param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
:return: a PublicKey object
"""
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)
def ssh_gss_oids(self, mode="client"):
"""
This method returns a single OID, because we only support the
Kerberos V5 mechanism.
:param str mode: Client for client mode and server for server mode
:return: A byte sequence containing the number of supported
OIDs, the length of the OID and the actual OID encoded with
DER
:rtype: Bytes
:note: In server mode we just return the OID length and the DER encoded
OID.
"""
OIDs = self._make_uint32(1)
krb5_OID = encoder.encode(ObjectIdentifier(self._krb5_mech))
OID_len = self._make_uint32(len(krb5_OID))
if mode == "server":
return OID_len + krb5_OID
return OIDs + OID_len + krb5_OID
def oidToMibName(mibView, oid):
if not isinstance(oid, tuple):
oid = tuple(univ.ObjectIdentifier(oid))
_oid, label, suffix = mibView.getNodeNameByOid(oid)
modName, symName, __suffix = mibView.getNodeLocation(_oid)
mibNode, = mibView.mibBuilder.importSymbols(
modName, symName
)
if hasattr(mibNode, 'createTest'): # table column
__modName, __symName, __s = mibView.getNodeLocation(_oid[:-1])
rowNode, = mibView.mibBuilder.importSymbols(__modName, __symName)
return (symName, modName), rowNode.getIndicesFromInstId(suffix)
elif not suffix: # scalar
return (symName, modName), suffix
elif suffix == (0,): # scalar
return (symName, modName), __scalarSuffix
else:
raise NoSuchObjectError(
str='No MIB registered that defines %s object, closest known parent is %s (%s::%s)' % (
univ.ObjectIdentifier(oid), univ.ObjectIdentifier(mibNode.name), modName, symName)
)
# Value
def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)
def load_pkcs1_openssl_der(cls, keyfile):
'''Loads a PKCS#1 DER-encoded public key file from OpenSSL.
@param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
@return: a PublicKey object
'''
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)
securityblob.py 文件源码
项目:plugin.video.streamondemand-pureita
作者: orione7
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)
def ssh_gss_oids(self, mode="client"):
"""
This method returns a single OID, because we only support the
Kerberos V5 mechanism.
:param str mode: Client for client mode and server for server mode
:return: A byte sequence containing the number of supported
OIDs, the length of the OID and the actual OID encoded with
DER
:rtype: Bytes
:note: In server mode we just return the OID length and the DER encoded
OID.
"""
OIDs = self._make_uint32(1)
krb5_OID = encoder.encode(ObjectIdentifier(self._krb5_mech))
OID_len = self._make_uint32(len(krb5_OID))
if mode == "server":
return OID_len + krb5_OID
return OIDs + OID_len + krb5_OID
def ssh_gss_oids(self, mode="client"):
"""
This method returns a single OID, because we only support the
Kerberos V5 mechanism.
:param str mode: Client for client mode and server for server mode
:return: A byte sequence containing the number of supported
OIDs, the length of the OID and the actual OID encoded with
DER
:rtype: Bytes
:note: In server mode we just return the OID length and the DER encoded
OID.
"""
OIDs = self._make_uint32(1)
krb5_OID = encoder.encode(ObjectIdentifier(self._krb5_mech))
OID_len = self._make_uint32(len(krb5_OID))
if mode == "server":
return OID_len + krb5_OID
return OIDs + OID_len + krb5_OID
def oidToMibName(mibView, oid):
if not isinstance(oid, tuple):
oid = tuple(univ.ObjectIdentifier(oid))
_oid, label, suffix = mibView.getNodeNameByOid(oid)
modName, symName, __suffix = mibView.getNodeLocation(_oid)
mibNode, = mibView.mibBuilder.importSymbols(
modName, symName
)
if hasattr(mibNode, 'createTest'): # table column
__modName, __symName, __s = mibView.getNodeLocation(_oid[:-1])
rowNode, = mibView.mibBuilder.importSymbols(__modName, __symName)
return (symName, modName), rowNode.getIndicesFromInstId(suffix)
elif not suffix: # scalar
return (symName, modName), suffix
elif suffix == (0,): # scalar
return (symName, modName), __scalarSuffix
else:
raise NoSuchObjectError(
str='No MIB registered that defines %s object, closest known parent is %s (%s::%s)' % (univ.ObjectIdentifier(oid), univ.ObjectIdentifier(mibNode.name), modName, symName)
)
# Value
def load_pkcs1_openssl_der(cls, keyfile):
"""Loads a PKCS#1 DER-encoded public key file from OpenSSL.
:param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
:return: a PublicKey object
"""
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def load_pkcs1_openssl_der(cls, keyfile):
"""Loads a PKCS#1 DER-encoded public key file from OpenSSL.
:param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
:return: a PublicKey object
"""
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def load_pkcs1_openssl_der(cls, keyfile):
"""Loads a PKCS#1 DER-encoded public key file from OpenSSL.
:param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
:return: a PublicKey object
"""
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def load_pkcs1_openssl_der(cls, keyfile):
"""Loads a PKCS#1 DER-encoded public key file from OpenSSL.
:param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
:return: a PublicKey object
"""
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def load_pkcs1_openssl_der(cls, keyfile):
'''Loads a PKCS#1 DER-encoded public key file from OpenSSL.
@param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
@return: a PublicKey object
'''
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def ssh_gss_oids(self, mode="client"):
"""
This method returns a single OID, because we only support the
Kerberos V5 mechanism.
:param str mode: Client for client mode and server for server mode
:return: A byte sequence containing the number of supported
OIDs, the length of the OID and the actual OID encoded with
DER
:note: In server mode we just return the OID length and the DER encoded
OID.
"""
OIDs = self._make_uint32(1)
krb5_OID = encoder.encode(ObjectIdentifier(self._krb5_mech))
OID_len = self._make_uint32(len(krb5_OID))
if mode == "server":
return OID_len + krb5_OID
return OIDs + OID_len + krb5_OID
def load_pkcs1_openssl_der(cls, keyfile):
"""Loads a PKCS#1 DER-encoded public key file from OpenSSL.
:param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
:return: a PublicKey object
"""
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)
def load_pkcs1_openssl_der(cls, keyfile):
"""Loads a PKCS#1 DER-encoded public key file from OpenSSL.
:param keyfile: contents of a DER-encoded file that contains the public
key, from OpenSSL.
:return: a PublicKey object
"""
from rsa.asn1 import OpenSSLPubKey
from pyasn1.codec.der import decoder
from pyasn1.type import univ
(keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey())
if keyinfo['header']['oid'] != univ.ObjectIdentifier('1.2.840.113549.1.1.1'):
raise TypeError("This is not a DER-encoded OpenSSL-compatible public key")
return cls._load_pkcs1_der(keyinfo['key'][1:])
def ssh_gss_oids(self, mode="client"):
"""
This method returns a single OID, because we only support the
Kerberos V5 mechanism.
:param str mode: Client for client mode and server for server mode
:return: A byte sequence containing the number of supported
OIDs, the length of the OID and the actual OID encoded with
DER
:note: In server mode we just return the OID length and the DER encoded
OID.
"""
OIDs = self._make_uint32(1)
krb5_OID = encoder.encode(ObjectIdentifier(self._krb5_mech))
OID_len = self._make_uint32(len(krb5_OID))
if mode == "server":
return OID_len + krb5_OID
return OIDs + OID_len + krb5_OID
def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)
def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)