def prettyIn(self, value):
try:
if isinstance(value, unicode):
return value
elif isinstance(value, str):
return value.decode(self.encoding)
elif isinstance(value, (tuple, list)):
return self.prettyIn(''.join([chr(x) for x in value]))
elif isinstance(value, univ.OctetString):
return value.asOctets().decode(self.encoding)
else:
return unicode(value)
except (UnicodeDecodeError, LookupError):
raise error.PyAsn1Error(
"Can't decode string '%s' with codec %s" % (value, self.encoding)
)
python类OctetString()的实例源码
def prettyIn(self, value):
try:
if isinstance(value, str):
return value
elif isinstance(value, bytes):
return value.decode(self.encoding)
elif isinstance(value, (tuple, list)):
return self.prettyIn(bytes(value))
elif isinstance(value, univ.OctetString):
return value.asOctets().decode(self.encoding)
else:
return str(value)
except (UnicodeDecodeError, LookupError):
raise error.PyAsn1Error(
"Can't decode string '%s' with codec %s" % (value, self.encoding)
)
def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)
def __getEncryptionKey(self, privKey, snmpEngineBoots, snmpEngineTime):
salt = [self._localInt >> 56 & 0xff,
self._localInt >> 48 & 0xff,
self._localInt >> 40 & 0xff,
self._localInt >> 32 & 0xff,
self._localInt >> 24 & 0xff,
self._localInt >> 16 & 0xff,
self._localInt >> 8 & 0xff,
self._localInt & 0xff]
if self._localInt == 0xffffffffffffffff:
self._localInt = 0
else:
self._localInt += 1
return self.__getDecryptionKey(privKey, snmpEngineBoots, snmpEngineTime, salt) + (
univ.OctetString(salt).asOctets(),)
def __getDecryptionKey(self, privKey, snmpEngineBoots,
snmpEngineTime, salt):
snmpEngineBoots, snmpEngineTime, salt = (
int(snmpEngineBoots), int(snmpEngineTime), salt
)
iv = [snmpEngineBoots >> 24 & 0xff,
snmpEngineBoots >> 16 & 0xff,
snmpEngineBoots >> 8 & 0xff,
snmpEngineBoots & 0xff,
snmpEngineTime >> 24 & 0xff,
snmpEngineTime >> 16 & 0xff,
snmpEngineTime >> 8 & 0xff,
snmpEngineTime & 0xff] + salt
return privKey[:self.keySize].asOctets(), univ.OctetString(iv).asOctets()
def encryptData(self, encryptKey, privParameters, dataToEncrypt):
if DES is None:
raise error.StatusInformation(
errorIndication=errind.encryptionError
)
snmpEngineBoots, snmpEngineTime, salt = privParameters
# 8.3.1.1
desKey, salt, iv = self.__getEncryptionKey(
encryptKey, snmpEngineBoots
)
# 8.3.1.2
privParameters = univ.OctetString(salt)
# 8.1.1.2
desObj = DES.new(desKey, DES.MODE_CBC, iv)
plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets()
ciphertext = desObj.encrypt(plaintext)
# 8.3.1.3 & 4
return univ.OctetString(ciphertext), privParameters
# 8.2.4.2
def hashPassphrase(passphrase, hashFunc):
passphrase = univ.OctetString(passphrase).asOctets()
# noinspection PyDeprecation,PyCallingNonCallable
hasher = hashFunc()
ringBuffer = passphrase * (64 // len(passphrase) + 1)
# noinspection PyTypeChecker
ringBufferLen = len(ringBuffer)
count = 0
mark = 0
while count < 16384:
e = mark + 64
if e < ringBufferLen:
hasher.update(ringBuffer[mark:e])
mark = e
else:
hasher.update(
ringBuffer[mark:ringBufferLen] + ringBuffer[0:e - ringBufferLen]
)
mark = e - ringBufferLen
count += 1
return hasher.digest()
def encryptData(self, encryptKey, privParameters, dataToEncrypt):
if DES3 is None:
raise error.StatusInformation(
errorIndication=errind.encryptionError
)
snmpEngineBoots, snmpEngineTime, salt = privParameters
des3Key, salt, iv = self.__getEncryptionKey(
encryptKey, snmpEngineBoots
)
des3Obj = DES3.new(des3Key, DES3.MODE_CBC, iv)
privParameters = univ.OctetString(salt)
plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets()
ciphertext = des3Obj.encrypt(plaintext)
return univ.OctetString(ciphertext), privParameters
# 5.1.1.3
def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)
def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)
securityblob.py 文件源码
项目:plugin.video.streamondemand-pureita
作者: orione7
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)
def __getEncryptionKey(self, privKey, snmpEngineBoots, snmpEngineTime):
salt = [self._localInt>>56&0xff,
self._localInt>>48&0xff,
self._localInt>>40&0xff,
self._localInt>>32&0xff,
self._localInt>>24&0xff,
self._localInt>>16&0xff,
self._localInt>>8&0xff,
self._localInt&0xff]
if self._localInt == 0xffffffffffffffff:
self._localInt = 0
else:
self._localInt += 1
return self.__getDecryptionKey(privKey, snmpEngineBoots, snmpEngineTime, salt) + (univ.OctetString(salt).asOctets(),)
def __getDecryptionKey(self, privKey, snmpEngineBoots,
snmpEngineTime, salt):
snmpEngineBoots, snmpEngineTime, salt = (
int(snmpEngineBoots), int(snmpEngineTime), salt
)
iv = [snmpEngineBoots>>24&0xff,
snmpEngineBoots>>16&0xff,
snmpEngineBoots>>8&0xff,
snmpEngineBoots&0xff,
snmpEngineTime>>24&0xff,
snmpEngineTime>>16&0xff,
snmpEngineTime>>8&0xff,
snmpEngineTime&0xff] + salt
return privKey[:self.keySize].asOctets(), univ.OctetString(iv).asOctets()
def encryptData(self, encryptKey, privParameters, dataToEncrypt):
if DES is None:
raise error.StatusInformation(
errorIndication=errind.encryptionError
)
snmpEngineBoots, snmpEngineTime, salt = privParameters
# 8.3.1.1
desKey, salt, iv = self.__getEncryptionKey(
encryptKey, snmpEngineBoots
)
# 8.3.1.2
privParameters = univ.OctetString(salt)
# 8.1.1.2
desObj = DES.new(desKey, DES.MODE_CBC, iv)
plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets()
ciphertext = desObj.encrypt(plaintext)
# 8.3.1.3 & 4
return univ.OctetString(ciphertext), privParameters
# 8.2.4.2
def hashPassphraseMD5(passphrase):
passphrase = univ.OctetString(passphrase).asOctets()
md = md5()
ringBuffer = passphrase * (passphrase and (64//len(passphrase)+1) or 1)
ringBufferLen = len(ringBuffer)
count = 0
mark = 0
while count < 16384:
e = mark + 64
if e < ringBufferLen:
md.update(ringBuffer[mark:e])
mark = e
else:
md.update(
ringBuffer[mark:ringBufferLen] + ringBuffer[0:e-ringBufferLen]
)
mark = e-ringBufferLen
count += 1
return md.digest()
def hashPassphraseSHA(passphrase):
passphrase = univ.OctetString(passphrase).asOctets()
md = sha1()
ringBuffer = passphrase * (64//len(passphrase)+1)
ringBufferLen = len(ringBuffer)
count = 0
mark = 0
while count < 16384:
e = mark + 64
if e < ringBufferLen:
md.update(ringBuffer[mark:e])
mark = e
else:
md.update(
ringBuffer[mark:ringBufferLen] + ringBuffer[0:e-ringBufferLen]
)
mark = e-ringBufferLen
count += 1
return md.digest()
def test_aes_encryption_consistency(self):
# test encryption-decryption for each test message
for msg in self.TEST_MSGS:
# precreate simple ASN.1 structure to encode and encrypt
test_der = encode(OctetString(msg))
# generate random IV and a key
iv = urandom(16)
key = urandom(16)
# encrypt and decrypt message
ct = self.crypto_obj._Crypto__encrypt_with_aes(test_der, key, iv)
pt = self.crypto_obj._Crypto__decrypt_with_aes(ct, key, iv)
# check whether they are equal
self.assertEqual(test_der, pt)
def prettyIn(self, value):
try:
if isinstance(value, unicode):
return value
elif isinstance(value, str):
return value.decode(self.encoding)
elif isinstance(value, (tuple, list)):
return self.prettyIn(''.join([chr(x) for x in value]))
elif isinstance(value, univ.OctetString):
return value.asOctets().decode(self.encoding)
else:
return unicode(value)
except (UnicodeDecodeError, LookupError):
raise error.PyAsn1Error(
"Can't decode string '%s' with codec %s" % (value, self.encoding)
)
def prettyIn(self, value):
try:
if isinstance(value, str):
return value
elif isinstance(value, bytes):
return value.decode(self.encoding)
elif isinstance(value, (tuple, list)):
return self.prettyIn(bytes(value))
elif isinstance(value, univ.OctetString):
return value.asOctets().decode(self.encoding)
else:
return str(value)
except (UnicodeDecodeError, LookupError):
raise error.PyAsn1Error(
"Can't decode string '%s' with codec %s" % (value, self.encoding)
)
def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)
def probeContext(transportDomain, transportAddress, contextName):
candidate = [
contextName, '.'.join([str(x) for x in transportDomain])
]
if transportDomain[:len(udp.domainName)] == udp.domainName:
candidate.append(transportAddress[0])
elif udp6 and transportDomain[:len(udp6.domainName)] == udp6.domainName:
candidate.append(
str(transportAddress[0]).replace(':', '_')
)
elif unix and transportDomain[:len(unix.domainName)] == unix.domainName:
candidate.append(transportAddress)
candidate = [str(x) for x in candidate if x]
while candidate:
yield rfc1902.OctetString(os.path.normpath(os.path.sep.join(candidate)).replace(os.path.sep, '/')).asOctets()
del candidate[-1]
# main script body starts here
def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)
def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)
def generateNegotiateSecurityBlob(ntlm_data):
mech_token = univ.OctetString(ntlm_data).subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))
mech_types = MechTypeList().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
mech_types.setComponentByPosition(0, univ.ObjectIdentifier('1.3.6.1.4.1.311.2.2.10'))
n = NegTokenInit().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0))
n.setComponentByName('mechTypes', mech_types)
n.setComponentByName('mechToken', mech_token)
nt = NegotiationToken()
nt.setComponentByName('negTokenInit', n)
ct = ContextToken()
ct.setComponentByName('thisMech', univ.ObjectIdentifier('1.3.6.1.5.5.2'))
ct.setComponentByName('innerContextToken', nt)
return encoder.encode(ct)
def __getEncryptionKey(self, privKey, snmpEngineBoots, snmpEngineTime):
salt = [self._localInt>>56&0xff,
self._localInt>>48&0xff,
self._localInt>>40&0xff,
self._localInt>>32&0xff,
self._localInt>>24&0xff,
self._localInt>>16&0xff,
self._localInt>>8&0xff,
self._localInt&0xff]
if self._localInt == 0xffffffffffffffff:
self._localInt = 0
else:
self._localInt += 1
return self.__getDecryptionKey(privKey, snmpEngineBoots, snmpEngineTime, salt) + (univ.OctetString(salt).asOctets(),)
def __getDecryptionKey(self, privKey, snmpEngineBoots,
snmpEngineTime, salt):
snmpEngineBoots, snmpEngineTime, salt = (
int(snmpEngineBoots), int(snmpEngineTime), salt
)
iv = [snmpEngineBoots>>24&0xff,
snmpEngineBoots>>16&0xff,
snmpEngineBoots>>8&0xff,
snmpEngineBoots&0xff,
snmpEngineTime>>24&0xff,
snmpEngineTime>>16&0xff,
snmpEngineTime>>8&0xff,
snmpEngineTime&0xff] + salt
return privKey[:self.keySize].asOctets(), univ.OctetString(iv).asOctets()
def encryptData(self, encryptKey, privParameters, dataToEncrypt):
if DES is None:
raise error.StatusInformation(
errorIndication=errind.encryptionError
)
snmpEngineBoots, snmpEngineTime, salt = privParameters
# 8.3.1.1
desKey, salt, iv = self.__getEncryptionKey(
encryptKey, snmpEngineBoots
)
# 8.3.1.2
privParameters = univ.OctetString(salt)
# 8.1.1.2
desObj = DES.new(desKey, DES.MODE_CBC, iv)
plaintext = dataToEncrypt + univ.OctetString((0,) * (8 - len(dataToEncrypt) % 8)).asOctets()
ciphertext = desObj.encrypt(plaintext)
# 8.3.1.3 & 4
return univ.OctetString(ciphertext), privParameters
# 8.2.4.2
def hashPassphraseMD5(passphrase):
passphrase = univ.OctetString(passphrase).asOctets()
md = md5()
ringBuffer = passphrase * (passphrase and (64//len(passphrase)+1) or 1)
ringBufferLen = len(ringBuffer)
count = 0
mark = 0
while count < 16384:
e = mark + 64
if e < ringBufferLen:
md.update(ringBuffer[mark:e])
mark = e
else:
md.update(
ringBuffer[mark:ringBufferLen] + ringBuffer[0:e-ringBufferLen]
)
mark = e-ringBufferLen
count += 1
return md.digest()
def hashPassphraseSHA(passphrase):
passphrase = univ.OctetString(passphrase).asOctets()
md = sha1()
ringBuffer = passphrase * (64//len(passphrase)+1)
ringBufferLen = len(ringBuffer)
count = 0
mark = 0
while count < 16384:
e = mark + 64
if e < ringBufferLen:
md.update(ringBuffer[mark:e])
mark = e
else:
md.update(
ringBuffer[mark:ringBufferLen] + ringBuffer[0:e-ringBufferLen]
)
mark = e-ringBufferLen
count += 1
return md.digest()
def set(self, *oidvalues):
assert self.alive is True
oidvalues_trans = []
for oid, value in oidvalues:
if isinstance(oid, tuple):
has_str = False
for entry in oid:
if isinstance(entry, str):
has_str = True
break
if has_str: # if oid is a tuple containing strings, assume translation using cmdgen.MibVariable.
# value must then be a Python type
assert isinstance(value, int) or isinstance(value, str) or isinstance(value, bool)
oidvalues_trans.append((cmdgen.MibVariable(*oid), value))
else:
# value must be a rfc1902/pyasn1 type
if not oid[-1] == 0:
assert isinstance(value, univ.Integer) or isinstance(value, univ.OctetString) or isinstance(value, univ.ObjectIdentifier)
oidvalues_trans.append((oid, value))
elif isinstance(oid, str): # if oid is a string, assume nodeid lookup
# value must then be a rfc1902/pyasn1 type, if oid is not a scalar
if not oid.endswith(".0"):
assert isinstance(value, univ.Integer) or isinstance(value, univ.OctetString) or isinstance(value, univ.ObjectIdentifier)
oidvalues_trans.append((nodeid(oid), value))
(error_indication, error_status, error_index, varbinds) = \
cmdgen.CommandGenerator().setCmd(self.auth, cmdgen.UdpTransportTarget((self.host, self.port), timeout=self.timeout, retries=self.retries), *oidvalues_trans) # pylint: disable=W0612
if error_indication or error_status:
self.__set_error(error_indication, error_status, error_index, varbinds)
raise SnmpError("SNMP set command on %s of oid values %r failed" % (self.host, oidvalues_trans), error_indication, error_status, error_index, varbinds)
return SnmpVarBinds(varbinds)