def _ip2num(self, ip):
"""??IP?????"""
ip_num = -1
try:
# ?IP???INT/LONG ??
ip_num = socket.ntohl(struct.unpack("I",socket.inet_aton(str(ip)))[0])
except:
pass
finally:
return ip_num
python类ntohl()的实例源码
def ip2num(self, ip):
"""???IP?????
Args:
ip: ??ip: 10.10.10.11
Return:
???? Long ?????
?: 123456789
Raise: None
"""
return socket.ntohl(struct.unpack('I', socket.inet_aton(str(ip)))[0])
def ip_to_num(ip):
ip_num = socket.ntohl(struct.unpack("I", socket.inet_aton(str(ip)))[0])
return ip_num
def decrypt(self,text,appid):
"""?????????????
@param text: ??
@return: ??????????
"""
try:
cryptor = AES.new(self.key,self.mode,self.key[:16])
# ??BASE64??????????AES-CBC??
plain_text = cryptor.decrypt(base64.b64decode(text))
except Exception,e:
#print e
return ierror.WXBizMsgCrypt_DecryptAES_Error,None
try:
pad = ord(plain_text[-1])
# ???????
#pkcs7 = PKCS7Encoder()
#plain_text = pkcs7.encode(plain_text)
# ??16??????
content = plain_text[16:-pad]
xml_len = socket.ntohl(struct.unpack("I",content[ : 4])[0])
xml_content = content[4 : xml_len+4]
from_appid = content[xml_len+4:]
except Exception,e:
#print e
return ierror.WXBizMsgCrypt_IllegalBuffer,None
if from_appid != appid:
return ierror.WXBizMsgCrypt_ValidateAppid_Error,None
return 0,xml_content
def reverse(self, val):
if self.size == 16:
val = socket.ntohs(val)
elif self.size == 32:
val = socket.ntohl(val)
return val
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1, 2, 3 ]
bad_values = [ -1, -2, -3, -1, -2, -3 ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htonl, k)
self.assertRaises(OverflowError, socket.htons, k)
def decrypt(self, text, appid):
"""?????????????
@param text: ??
@return: ??????????
"""
try:
cryptor = AES.new(self.key, self.mode, self.key[:16])
# ??BASE64??????????AES-CBC??
plain_text = cryptor.decrypt(base64.b64decode(text))
except Exception as e:
raise DecryptAESError(e)
try:
if six.PY2:
pad = ord(plain_text[-1])
else:
pad = plain_text[-1]
# ???????
# pkcs7 = PKCS7Encoder()
# plain_text = pkcs7.encode(plain_text)
# ??16??????
content = plain_text[16:-pad]
xml_len = socket.ntohl(struct.unpack("I", content[: 4])[0])
xml_content = content[4: xml_len + 4]
from_appid = content[xml_len + 4:]
except Exception as e:
raise IllegalBuffer(e)
if from_appid != appid:
raise ValidateAppIDError()
return xml_content
def decrypt(self,text,appid):
"""?????????????
@param text: ??
@return: ??????????
"""
try:
cryptor = AES.new(self.key,self.mode,self.key[:16])
# ??BASE64??????????AES-CBC??
plain_text = cryptor.decrypt(base64.b64decode(text))
except Exception,e:
#print e
return ierror.WXBizMsgCrypt_DecryptAES_Error,None
try:
pad = ord(plain_text[-1])
# ???????
#pkcs7 = PKCS7Encoder()
#plain_text = pkcs7.encode(plain_text)
# ??16??????
content = plain_text[16:-pad]
xml_len = socket.ntohl(struct.unpack("I",content[ : 4])[0])
xml_content = content[4 : xml_len+4]
from_appid = content[xml_len+4:]
except Exception,e:
#print e
return ierror.WXBizMsgCrypt_IllegalBuffer,None
if from_appid != appid:
return ierror.WXBizMsgCrypt_ValidateAppid_Error,None
return 0,xml_content
def decrypt(self, text, appid):
"""?????????????
@param text: ??
@return: ??????????
"""
try:
cryptor = AES.new(self.key, self.mode, self.key[:16])
# ??BASE64??????????AES-CBC??
plain_text = cryptor.decrypt(base64.b64decode(text))
except Exception:
return WXBizMsgCrypt_DecryptAES_Error, None
try:
pad = plain_text[-1]
# ???????
# pkcs7 = PKCS7Encoder()
# plain_text = pkcs7.encode(plain_text)
# ??16??????
content = plain_text[16:-pad]
xml_len = socket.ntohl(struct.unpack(b"I", content[:4])[0])
xml_content = content[4:xml_len+4]
from_appid = smart_bytes(content[xml_len+4:])
except Exception:
return WXBizMsgCrypt_IllegalBuffer, None
if from_appid != smart_bytes(appid):
return WXBizMsgCrypt_ValidateAppid_Error, None
return 0, xml_content
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1L<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1L<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1L, 2L, 3L ]
bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises((OverflowError, ValueError), socket.ntohl, k)
self.assertRaises((OverflowError, ValueError), socket.ntohs, k)
self.assertRaises((OverflowError, ValueError), socket.htonl, k)
self.assertRaises((OverflowError, ValueError), socket.htons, k)
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1, 2, 3 ]
bad_values = [ -1, -2, -3, -1, -2, -3 ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htonl, k)
self.assertRaises(OverflowError, socket.htons, k)
def decrypt(self,text,corpid):
"""?????????????
@param text: ??
@return: ??????????
"""
try:
cryptor = AES.new(self.key,self.mode,self.key[:16])
# ??BASE64??????????AES-CBC??
plain_text = cryptor.decrypt(base64.b64decode(text))
except Exception,e:
print e
return ierror.WXBizMsgCrypt_DecryptAES_Error,None
try:
pad = ord(plain_text[-1])
# ???????
#pkcs7 = PKCS7Encoder()
#plain_text = pkcs7.encode(plain_text)
# ??16??????
content = plain_text[16:-pad]
xml_len = socket.ntohl(struct.unpack("I",content[ : 4])[0])
xml_content = content[4 : xml_len+4]
from_corpid = content[xml_len+4:]
except Exception,e:
print e
return ierror.WXBizMsgCrypt_IllegalBuffer,None
if from_corpid != corpid:
return ierror.WXBizMsgCrypt_ValidateCorpid_Error,None
return 0,xml_content
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1L<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1L<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1L, 2L, 3L ]
bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htonl, k)
self.assertRaises(OverflowError, socket.htons, k)
def ip_to_int(ip):
return socket.ntohl(struct.unpack("I",socket.inet_aton(ip))[0])
def ip_to_int(self, ip):
return struct.unpack('I', struct.pack('I', (socket.ntohl(struct.unpack('I', socket.inet_aton(ip))[0]))))[0]
def decrypt(self,text,corpid):
"""?????????????
@param text: ??
@return: ??????????
"""
try:
cryptor = AES.new(self.key,self.mode,self.key[:16])
# ??BASE64??????????AES-CBC??
plain_text = cryptor.decrypt(base64.b64decode(text))
except Exception,e:
print e
return ierror.WXBizMsgCrypt_DecryptAES_Error,None
try:
pad = ord(plain_text[-1])
# ???????
#pkcs7 = PKCS7Encoder()
#plain_text = pkcs7.encode(plain_text)
# ??16??????
content = plain_text[16:-pad]
xml_len = socket.ntohl(struct.unpack("I",content[ : 4])[0])
xml_content = content[4 : xml_len+4]
from_corpid = content[xml_len+4:]
except Exception,e:
print e
return ierror.WXBizMsgCrypt_IllegalBuffer,None
if from_corpid != corpid:
return ierror.WXBizMsgCrypt_ValidateCorpid_Error,None
return 0,xml_content