def validate(self, data):
eth_length = 14
eth_header = data[:eth_length]
eth = struct.unpack('!6s6sH' , eth_header)
eth_protocol = socket.ntohs(eth[2])
# print 'Destination MAC : ' + self.eth_addr(data[0:6]) + ' Source MAC : ' + self.eth_addr(data[6:12]) + ' Protocol : ' + str(eth_protocol)
ip_header = data[eth_length:20+eth_length]
iph = struct.unpack('!BBHHHBBH4s4s' , ip_header)
version_ihl = iph[0]
ihl = version_ihl & 0xF
iph_length = ihl * 4
protocol = iph[6]
s_addr = socket.inet_ntoa(iph[8])
d_addr = socket.inet_ntoa(iph[9])
# udp_header = data[iph_length+eth_length:iph_length+20+eth_length]
# udph = struct.unpack('!HHLLBBHHH' , udp_header)
# print "[{}] - {}:{} -> {}:{}".format(protocol, s_addr,udph[0], d_addr,udph[1])
python类ntohs()的实例源码
def validate(self, data):
eth_length = 14
eth_header = data[:eth_length]
eth = struct.unpack('!6s6sH' , eth_header)
eth_protocol = socket.ntohs(eth[2])
# print 'Destination MAC : ' + self.eth_addr(data[0:6]) + ' Source MAC : ' + self.eth_addr(data[6:12]) + ' Protocol : ' + str(eth_protocol)
ip_header = data[eth_length:20+eth_length]
iph = struct.unpack('!BBHHHBBH4s4s' , ip_header)
version_ihl = iph[0]
ihl = version_ihl & 0xF
iph_length = ihl * 4
protocol = iph[6]
s_addr = socket.inet_ntoa(iph[8])
d_addr = socket.inet_ntoa(iph[9])
# udp_header = data[iph_length+eth_length:iph_length+20+eth_length]
# udph = struct.unpack('!HHLLBBHHH' , udp_header)
# print "[{}] - {}:{} -> {}:{}".format(protocol, s_addr,udph[0], d_addr,udph[1])
def user_login(self):
login = socket.ntohs(1)
login = struct.pack('h',login)
action = socket.ntohs(2)
peer_id = socket.ntohl(632949210)
myid = 632949211
action = struct.pack('h', action)
peer_id = struct.pack('I', peer_id)
myid = struct.pack('I',myid)
print ("user login...")
msgbody_len = 4
msgbody_len = socket.ntohs(msgbody_len)
msgbody_len = struct.pack('h', msgbody_len)
send_login = login + peer_id + msgbody_len + myid
self.send(send_login)
def reverse(self, val):
if self.size == 16:
val = socket.ntohs(val)
elif self.size == 32:
val = socket.ntohl(val)
return val
def get_pid_port_tcp(self, port):
for item in self.get_extended_tcp_table():
lPort = socket.ntohs(item.dwLocalPort)
lAddr = socket.inet_ntoa(struct.pack('L', item.dwLocalAddr))
pid = item.dwOwningPid
if lPort == port:
return pid
else:
return None
#################################################################################
# The GetExtendedUdpTable function retrieves a table that contains a list of UDP endpoints available to the application.
#
# DWORD GetExtendedUdpTable(
# _Out_ PVOID pUdpTable,
# _Inout_ PDWORD pdwSize,
# _In_ BOOL bOrder,
# _In_ ULONG ulAf,
# _In_ UDP_TABLE_CLASS TableClass,
# _In_ ULONG Reserved
# );
def get_pid_port_udp(self, port):
for item in self.get_extended_udp_table():
lPort = socket.ntohs(item.dwLocalPort)
lAddr = socket.inet_ntoa(struct.pack('L', item.dwLocalAddr))
pid = item.dwOwningPid
if lPort == port:
return pid
else:
return None
###############################################################################
# Retrieves the name of the executable file for the specified process.
#
# DWORD WINAPI GetProcessImageFileName(
# _In_ HANDLE hProcess,
# _Out_ LPTSTR lpImageFileName,
# _In_ DWORD nSize
# );
def main():
connection=socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(3))
while True:
raw_data,addr=connection.recvfrom(65536)
dest_mac, src_mac, eth_proto, data = ethernet_frame(raw_data)
print('\nEthernet Frame:')
print(TAB_1+'Destination: {}, Source: {}, Protocol: {}'.format(dest_mac,src_mac,eth_proto))
if eth_proto==8:
(version,header_length,ttl,proto,src,target,data)=ipv4_packet(data)
print(TAB_1+'IPv4 Packet:')
print(TAB_2 + 'Version: {}, Header Length: {}, TTL: {},'.format(version,header_length,ttl))
print(TAB_2 + 'Protocol: {}, Source: {}, Target: {}'.format(proto, src, target))
elif eth_proto==1:
icmp = ICMP(ipv4.data)
def parse_ethernet_header(self, packet):
dst_mac = self.bytes_to_mac(packet[0:6])
src_mac = self.bytes_to_mac(packet[6:12])
eth_protocol = socket.ntohs((packet[13] << 8) + packet[12])
return dst_mac, src_mac, eth_protocol
def remote_port(self):
""":type: :class:`int`"""
if not self.established:
return None
return socket.ntohs(self.dwRemotePort)
def local_port(self):
""":type: :class:`int`"""
return socket.ntohs(self.dwLocalPort)
def remote_port(self):
""":type: :class:`int`"""
if not self.established:
return None
return socket.ntohs(self.dwRemotePort)
def local_port(self):
""":type: :class:`int`"""
return socket.ntohs(self.dwLocalPort)
def reverse(self, val):
if self.size == 16:
val = socket.ntohs(val)
elif self.size == 32:
val = socket.ntohl(val)
return val
def reverse(self, val):
if self.size == 16:
val = socket.ntohs(val)
elif self.size == 32:
val = socket.ntohl(val)
return val
def getPortNumHostByteOrder(st_network):
"""
Convert 16-bit positive integers from network to host byte order. On machines where the host byte order is the same as network byte order, this is a no-op; otherwise, it performs a 2-byte swap operation
:param st_network:
:return: 20480 - 80
"""
try:
st_network = int(st_network)
return socket.ntohs(st_network)
except Exception as e:
logging.error("[NetworkByteOrderPortFalse]: %f %s" % (st_network, repr(e)))
def reverse(self, val):
if self.size == 16:
val = socket.ntohs(val)
elif self.size == 32:
val = socket.ntohl(val)
return val
1_5_integer_conversion.py 文件源码
项目:Python-Network-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def convert_integer():
data = 1234
# 32-bit
print ("Original: %s => Long host byte order: %s, Network byte order: %s" %(data, socket.ntohl(data), socket.htonl(data)))
# 16-bit
print ("Original: %s => Short host byte order: %s, Network byte order: %s" %(data, socket.ntohs(data), socket.htons(data)))
def reverse(self, val):
if self.size == 16:
val = socket.ntohs(val)
elif self.size == 32:
val = socket.ntohl(val)
return val
def test_socket_has_some_reexports():
assert tsocket.SOL_SOCKET == stdlib_socket.SOL_SOCKET
assert tsocket.TCP_NODELAY == stdlib_socket.TCP_NODELAY
assert tsocket.gaierror == stdlib_socket.gaierror
assert tsocket.ntohs == stdlib_socket.ntohs
################################################################
# name resolution
################################################################
def __init__(self, interface):
global s, redirect_to_mac, arp
super(Arp_Spoof, self).__init__()
s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0800))
try:
s.bind((interface, socket.htons(0x0800)))
except socket.error:
print("\033[1;31mUnable to bind to interface... unknown type\033[00m")
exit()
def __init__(self, interface):
global s, sent, rev
sent = 0
rev = 0
super(Arp_Ping, self).__init__()
s = socket.socket(socket.PF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0800))
try:
s.bind((interface, socket.htons(0x0800)))
except socket.error:
print("\033[1;31mUnable to bind to interface... unknown type\033[00m")
exit()
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1, 2, 3 ]
bad_values = [ -1, -2, -3, -1, -2, -3 ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises((OverflowError, ValueError), socket.ntohl, k)
self.assertRaises((OverflowError, ValueError), socket.ntohs, k)
self.assertRaises((OverflowError, ValueError), socket.htonl, k)
self.assertRaises((OverflowError, ValueError), socket.htons, k)
def reverse(self, val):
if self.size == 16:
val = socket.ntohs(val)
elif self.size == 32:
val = socket.ntohl(val)
return val
def reverse(self, val):
if self.size == 16:
val = socket.ntohs(val)
elif self.size == 32:
val = socket.ntohl(val)
return val
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1L<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1L<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1L, 2L, 3L ]
bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htonl, k)
self.assertRaises(OverflowError, socket.htons, k)
def testNtoH(self):
# This just checks that htons etc. are their own inverse,
# when looking at the lower 16 or 32 bits.
sizes = {socket.htonl: 32, socket.ntohl: 32,
socket.htons: 16, socket.ntohs: 16}
for func, size in sizes.items():
mask = (1L<<size) - 1
for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
self.assertEqual(i & mask, func(func(i&mask)) & mask)
swapped = func(mask)
self.assertEqual(swapped & mask, mask)
self.assertRaises(OverflowError, func, 1L<<34)
def testNtoHErrors(self):
good_values = [ 1, 2, 3, 1L, 2L, 3L ]
bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
for k in good_values:
socket.ntohl(k)
socket.ntohs(k)
socket.htonl(k)
socket.htons(k)
for k in bad_values:
self.assertRaises(OverflowError, socket.ntohl, k)
self.assertRaises(OverflowError, socket.ntohs, k)
self.assertRaises(OverflowError, socket.htonl, k)
self.assertRaises(OverflowError, socket.htons, k)
def reverse(self, val):
if self.size == 16:
val = socket.ntohs(val)
elif self.size == 32:
val = socket.ntohl(val)
return val