def reverse(self, val):
if self.size == 16:
val = socket.ntohs(val)
elif self.size == 32:
val = socket.ntohl(val)
return val
python类ntohs()的实例源码
def reverse(self, val):
if self.size == 16:
val = socket.ntohs(val)
elif self.size == 32:
val = socket.ntohl(val)
return val
def get_total_length(self):
return ntohs(self.ip_len)
def reverse(self, val):
if self.size == 16:
val = socket.ntohs(val)
elif self.size == 32:
val = socket.ntohl(val)
return val
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1, 2, 3 ]
bad_values = [ -1, -2, -3, -1, -2, -3 ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htonl, k)
self.assertRaises(OverflowError, socket.htons, k)
def checksum(data):
data = six.binary_type(data) # input can be bytearray.
if len(data) % 2:
data += b'\x00'
s = sum(array.array('H', data))
s = (s & 0xffff) + (s >> 16)
s += (s >> 16)
return socket.ntohs(~s & 0xffff)
# avoid circular import
def checksum(data):
if len(data) % 2:
data += b'\x00'
s = sum(array.array('H',data))
s = (s & 0xffff) + (s >> 16)
s += (s >> 16)
return _socket.ntohs(~s & 0xffff)
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1L<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1L<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1L, 2L, 3L ]
bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises((OverflowError, ValueError), socket.ntohl, k)
self.assertRaises((OverflowError, ValueError), socket.ntohs, k)
self.assertRaises((OverflowError, ValueError), socket.htonl, k)
self.assertRaises((OverflowError, ValueError), socket.htons, k)
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1, 2, 3 ]
bad_values = [ -1, -2, -3, -1, -2, -3 ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htonl, k)
self.assertRaises(OverflowError, socket.htons, k)
def worker_send(self):
while True:
msgbody = input('say sth: ')
msgbody = msgbody.rstrip()
msgbody_len = len(msgbody)
msgbody_len = socket.ntohs(msgbody_len)
msgbody_len = struct.pack('h', msgbody_len)
send_msg = action + peer_id + msgbody_len + msgbody.encode('utf-8')
self.send(send_msg)
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1L<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1L<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1L, 2L, 3L ]
bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htonl, k)
self.assertRaises(OverflowError, socket.htons, k)
def __init__(self):
self.hostmac = ""
self.hostip = ""
self.conf = True
self.ifaceHost = "em1"
self.ifaceNetwork = "eth0"
self.sockHost = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
self.sockNetwork = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
try:
self.sockHost.bind((self.ifaceHost, 0))
self.sockNetwork.bind((self.ifaceNetwork, 0))
except:
#exit("You need 2 physical network interfaces to use FENRIR !")
print("You need 2 physical network interfaces to use FENRIR !")
self.inputs = [self.sockHost, self.sockNetwork]
def bindAllIface(self):
self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
def parsePacket(s):
d = {}
# http://www.tcpdump.org/linktypes.html
llHeaders = {
0: 4,
1: 14,
108: 4,
228: 0
}
if pcapObj.datalink() in llHeaders:
s = s[llHeaders[pcapObj.datalink()]:]
else:
stats['unknown L2 protocol'] += 1
d['version'] = (ord(s[0]) & 0xf0) >> 4
d['header_len'] = ord(s[0]) & 0x0f
d['tos'] = ord(s[1])
d['total_len'] = socket.ntohs(struct.unpack('H', s[2:4])[0])
d['id'] = socket.ntohs(struct.unpack('H', s[4:6])[0])
d['flags'] = (ord(s[6]) & 0xe0) >> 5
d['fragment_offset'] = socket.ntohs(struct.unpack('H', s[6:8])[0] & 0x1f)
d['ttl'] = ord(s[8])
d['protocol'] = ord(s[9])
d['checksum'] = socket.ntohs(struct.unpack('H', s[10:12])[0])
d['source_address'] = pcap.ntoa(struct.unpack('i', s[12:16])[0])
d['destination_address'] = pcap.ntoa(struct.unpack('i', s[16:20])[0])
if d['header_len'] > 5:
d['options'] = s[20:4 * (d['header_len'] - 5)]
else:
d['options'] = None
s = s[4 * d['header_len']:]
if d['protocol'] == 17:
d['source_port'] = socket.ntohs(struct.unpack('H', s[0:2])[0])
d['destination_port'] = socket.ntohs(struct.unpack('H', s[2:4])[0])
s = s[8:]
stats['UDP packets'] += 1
d['data'] = s
stats['IP packets'] += 1
return d
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1, 2, 3 ]
bad_values = [ -1, -2, -3, -1, -2, -3 ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htonl, k)
self.assertRaises(OverflowError, socket.htons, k)