def netmask(self, interface_name, netmask):
"""
Checking interface first, if interface name found in Get().interfaces()
validating Ipv4. After that applied ip address to interace
interface_name = Applied Interface
netmask = New netmask ip address
"""
interface_check = Get().interfaces
valid_ipv4 = validators.ipv4(netmask)
if not interface_name in interface_check:
raise WrongInterfaceName("Wrong Interface Name %s" % interface_name)
elif not valid_ipv4 is True:
raise NotValidIPv4Address("Not Valid IPv4 Address %s" % netmask)
else:
prefix_len = self.get_net_size(netmask.split('.'))
ifname = interface_name.encode(encoding='UTF-8')
netmask = ctypes.c_uint32(~((2 ** (32 - prefix_len)) - 1)).value
nmbytes = socket.htonl(netmask)
ifreq = struct.pack('16sH2sI8s', ifname, AF_INET, b'\x00'*2, nmbytes, b'\x00'*8)
fcntl.ioctl(self.sock, SIOCSIFNETMASK, ifreq)
python类htonl()的实例源码
def encrypt(self,text,appid):
"""???????
@param text: ???????
@return: ????????
"""
# 16?????????????
text = self.get_random_str() + struct.pack("I",socket.htonl(len(text))) + text + appid
# ???????????????????
pkcs7 = PKCS7Encoder()
text = pkcs7.encode(text)
# ??
cryptor = AES.new(self.key,self.mode,self.key[:16])
try:
ciphertext = cryptor.encrypt(text)
# ??BASE64????????????
return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
except Exception,e:
#print e
return ierror.WXBizMsgCrypt_EncryptAES_Error,None
def encrypt(self, text, appid):
"""???????
@param text: ???????
@return: ????????
"""
# 16?????????????
text = self.get_random_str() + struct.pack("I", socket.htonl(len(text))) + to_binary(text) + appid
# ???????????????????
pkcs7 = PKCS7Encoder()
text = pkcs7.encode(text)
# ??
cryptor = AES.new(self.key, self.mode, self.key[:16])
try:
ciphertext = cryptor.encrypt(text)
# ??BASE64????????????
return base64.b64encode(ciphertext)
except Exception as e:
raise EncryptAESError(e)
def encrypt(self,text,appid):
"""???????
@param text: ???????
@return: ????????
"""
# 16?????????????
text = self.get_random_str() + struct.pack("I",socket.htonl(len(text))) + text + appid
# ???????????????????
pkcs7 = PKCS7Encoder()
text = pkcs7.encode(text)
# ??
cryptor = AES.new(self.key,self.mode,self.key[:16])
try:
ciphertext = cryptor.encrypt(text)
# ??BASE64????????????
return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
except Exception,e:
#print e
return ierror.WXBizMsgCrypt_EncryptAES_Error,None
def encrypt(self, text, appid):
"""???????
@param text: ???????
@return: ????????
"""
# 16?????????????
pack_str = struct.pack(b"I", socket.htonl(len(text)))
text = smart_bytes(self.get_random_str()) + pack_str + smart_bytes(text) + smart_bytes(appid)
# ???????????????????
pkcs7 = PKCS7Encoder()
text = pkcs7.encode(text)
# ??
cryptor = AES.new(self.key, self.mode, self.key[:16])
try:
ciphertext = cryptor.encrypt(text)
# ??BASE64????????????
return WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
except Exception:
return WXBizMsgCrypt_EncryptAES_Error, None
def encrypt(self,text,corpid):
"""???????
@param text: ???????
@return: ????????
"""
# 16?????????????
text = self.get_random_str() + struct.pack("I",socket.htonl(len(text))) + text + corpid
# ???????????????????
pkcs7 = PKCS7Encoder()
text = pkcs7.encode(text)
# ??
cryptor = AES.new(self.key,self.mode,self.key[:16])
try:
ciphertext = cryptor.encrypt(text)
# ??BASE64????????????
return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
except Exception,e:
print e
return ierror.WXBizMsgCrypt_EncryptAES_Error,None
def encrypt(self,text,corpid):
"""???????
@param text: ???????
@return: ????????
"""
# 16?????????????
text = self.get_random_str() + struct.pack("I",socket.htonl(len(text))) + text + corpid
# ???????????????????
pkcs7 = PKCS7Encoder()
text = pkcs7.encode(text)
# ??
cryptor = AES.new(self.key,self.mode,self.key[:16])
try:
ciphertext = cryptor.encrypt(text)
# ??BASE64????????????
return ierror.WXBizMsgCrypt_OK, base64.b64encode(ciphertext)
except Exception,e:
print e
return ierror.WXBizMsgCrypt_EncryptAES_Error,None
def send(channel, *args):
buf = marshall(args)
value = htonl(len(buf))
size = struct.pack("L", value)
channel.send(size)
channel.send(buf)
def toString(self, noItemData=True):
self.header.fileLength = (
self.header._FORMAT.size
+ self.header.itemCount * HuaweiFirmwareItem._FORMAT.size
- 0x4c # FIXME: Can not find where does this bias come from.
)
strs = [
self.header.toString()[20:], # Partial header used for calculate CRC32 value.
]
if self.header.extraHeaderLength:
strs.append(self.extraHeader)
self.header.fileLength += len(self.extraHeader)
data = []
for item in self.items:
strs.append(item.toString())
data.append(item.data)
self.header.fileLength += item.size
# Convert to big endian.
self.header.fileLength = socket.htonl(self.header.fileLength)
# Update header CRC32 value.
self.header.headerCrc = seqCrc32(strs)
if not noItemData:
strs.extend(data)
# All data are present, now update file CRC32 value.
strs[0] = self.header.toString()[12:]
self.header.fileCrc = seqCrc32(strs)
# Using the latest header with correct CRC32 value and file length.
strs[0] = self.header.toString()
return ''.join(strs)
def route_to_dbus(route, family):
addr, netmask, gateway, metric = route
return [
fixups.addr_to_dbus(addr, family),
fixups.mask_to_dbus(netmask),
fixups.addr_to_dbus(gateway, family),
socket.htonl(metric)
]
# Constants below are generated with makeconstants.py. Do not edit manually.
communication.py 文件源码
项目:Software-Architecture-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def send(channel, *args):
""" Send a message to a channel """
buf = pickle.dumps(args)
value = socket.htonl(len(buf))
size = struct.pack("L",value)
channel.send(size)
channel.send(buf)
2_3_chat_server_with_select.py 文件源码
项目:Python-Network-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def send(channel, *args):
buffer = pickle.dumps(args)
value = socket.htonl(len(buffer))
size = struct.pack("L",value)
channel.send(size)
channel.send(buffer)
1_5_integer_conversion.py 文件源码
项目:Python-Network-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def convert_integer():
data = 1234
# 32-bit
print ("Original: %s => Long host byte order: %s, Network byte order: %s" %(data, socket.ntohl(data), socket.htonl(data)))
# 16-bit
print ("Original: %s => Short host byte order: %s, Network byte order: %s" %(data, socket.ntohs(data), socket.htons(data)))
def encrypt(self, xml):
"""???????
@param text: ???????
@return: ????????
"""
# 16?????????????
xml = xml.encode('utf-8')
if PY2:
text = self.get_random_str() +\
struct.pack("I", socket.htonl(len(xml))) + xml + self.appid
else:
text = self.get_random_str().encode('utf-8') +\
struct.pack("I", socket.htonl(len(xml))) +\
xml + self.appid.encode('utf-8')
# ???????????????????
pkcs7 = PKCS7Encoder()
text = pkcs7.encode(text)
# ??
cryptor = AES.new(self.key, self.mode, self.key[:16])
try:
ciphertext = cryptor.encrypt(text)
# ??BASE64????????????
result = base64.b64encode(ciphertext).decode("utf8")
return Crypt_OK, result
except Exception:
return Crypt_EncryptAES_Error, None
def handle_write(self):
while len(self.buffer) > 0:
payload, params = self.buffer.pop(0)
sndrcvinfo = sctp.sndrcvinfo()
sndrcvinfo.ppid = socket.htonl(params.get('ppid',0))
fromaddr,flags,msg,notif="",128,payload[:1460],sndrcvinfo
sent = self.sctp_send(msg,ppid=sndrcvinfo.ppid,flags=notif.flags)
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1, 2, 3 ]
bad_values = [ -1, -2, -3, -1, -2, -3 ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises((OverflowError, ValueError), socket.ntohl, k)
self.assertRaises((OverflowError, ValueError), socket.ntohs, k)
self.assertRaises((OverflowError, ValueError), socket.htonl, k)
self.assertRaises((OverflowError, ValueError), socket.htons, k)
def send(channel,*args):
buffers=pickle.dumps(args)
value=socket.htonl(len(buffers))
size=struct.pack("L",value)
channel.send(size)
channel.send(buffers)
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1L<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1L<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1L, 2L, 3L ]
bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htonl, k)
self.assertRaises(OverflowError, socket.htons, k)
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1L<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1L<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1L, 2L, 3L ]
bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htonl, k)
self.assertRaises(OverflowError, socket.htons, k)
def num2ip(self, num):
"""???10???????IP
Args:
num: ????
Return:
???? ??? IP
?: 10.10.10.11
Raise: None
"""
return socket.inet_ntoa(struct.pack('I', socket.htonl(num)))
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1, 2, 3 ]
bad_values = [ -1, -2, -3, -1, -2, -3 ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htonl, k)
self.assertRaises(OverflowError, socket.htons, k)
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1L<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1L<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1L, 2L, 3L ]
bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises((OverflowError, ValueError), socket.ntohl, k)
self.assertRaises((OverflowError, ValueError), socket.ntohs, k)
self.assertRaises((OverflowError, ValueError), socket.htonl, k)
self.assertRaises((OverflowError, ValueError), socket.htons, k)
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1, 2, 3 ]
bad_values = [ -1, -2, -3, -1, -2, -3 ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htonl, k)
self.assertRaises(OverflowError, socket.htons, k)
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1L<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1L<<34)