def send_arp_reply(request):
if request.haslayer(ARP):
global _current_number_of_packets
global _current_network_interface
global _current_mac_address
global _arp
SOCK = socket(AF_PACKET, SOCK_RAW)
SOCK.bind((_current_network_interface, 0))
if request[ARP].op == 1:
if request[Ether].dst == "ff:ff:ff:ff:ff:ff" and request[ARP].hwdst == "00:00:00:00:00:00":
print Base.c_info + "ARP request from MAC: " + request[ARP].hwsrc + " IP: " + request[ARP].pdst
reply = _arp.make_response(ethernet_src_mac=_current_mac_address,
ethernet_dst_mac=request[ARP].hwsrc,
sender_mac=_current_mac_address, sender_ip=request[ARP].pdst,
target_mac=request[ARP].hwsrc, target_ip=request[ARP].psrc)
SOCK.send(reply)
_current_number_of_packets += 1
if _current_number_of_packets >= _number_of_packets:
SOCK.close()
exit(0)
python类AF_PACKET的实例源码
network_conflict_creator.py 文件源码
项目:raw-packet
作者: Vladimir-Ivanov-Git
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def send_dhcpv6_solicit():
Client_DUID = dhcpv6r.get_client_duid(macsrc)
request_options = [23, 24]
pkt = dhcpv6r.make_solicit_packet(ethernet_src_mac=macsrc,
ipv6_src=ipv6src_link,
transaction_id=randint(1, 16777215),
client_identifier=Client_DUID,
option_request_list=request_options)
try:
SOCK = socket(AF_PACKET, SOCK_RAW)
SOCK.bind((current_network_interface, 0))
SOCK.send(pkt)
print Base.c_info + "Send Solicit request to: [ff02::1:2]:547"
SOCK.close()
except:
print Base.c_error + "Do not send Solicit request."
exit(1)
def __init__(self, interface_name, on_ip_incoming, on_ip_outgoing):
self.interface_name = interface_name
self.on_ip_incoming = on_ip_incoming
self.on_ip_outgoing = on_ip_outgoing
# The raw in (listen) socket is a L2 raw socket that listens
# for all packets going through a specific interface.
# SOL_SOCKET: SO_LINGER, SO_RCVBUF, SO_SNDBUF, TCP_NODELAY
# SO_LINGER ???????????????????TIME_WAIT?????
# ????????????????????tcp??
# SO_RCVBUF?SO_SNDBUF ????????????????
# SO_SNDBUF ?? MSS??????? 2MSS
# TCP_NODELAY ?? nagle ??
self.sock = socket.socket(
socket.AF_PACKET, socket.SOCK_RAW, socket.htons(ETH_P_IP))
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 2 ** 30)
self.sock.bind((self.interface_name, ETH_P_IP))
def server_socket_init(self):
if "stdin" in self.local_end_type:
return
try:
if "windows" in system().lower() or "cygwin" in system().lower():
self.server_socket.bind((self.lhost,self.lport))
elif self.server_socket.family == socket.AF_PACKET:
# case normal L2 socket
if self.server_socket.proto == 0:
return
# case promiscuous socket
if self.server_socket.proto == 0x300:
return
elif self.server_socket.family == socket.AF_UNIX:
self.server_socket.bind((self.lhost))
elif "darwin" not in system().lower() and self.server_socket.family == socket.AF_PACKET:
self.server_socket.bind((self.lhost,0))
else:
self.server_socket.bind((self.lhost,self.lport))
output("[*.*] Listening on %s:%s" % (self.lhost,str(self.lport)),CYAN)
except Exception as e:
output(str(e),YELLOW)
output("[x.x] Unable to bind to %s:%s" % (self.lhost,str(self.lport)) ,RED)
sys.exit(0)
output("[$.$] local:%s|remote:%s" % (self.local_end_type,self.remote_end_type), GREEN)
if self.local_end_type in ConnectionBased:
try:
self.server_socket.listen(self.max_conns)
except Exception as e:
output(e)
def ack_sender():
SOCK = socket(AF_PACKET, SOCK_RAW)
SOCK.bind((current_network_interface, 0))
ack_packet = make_dhcp_ack_packet(transaction_id_global, requested_ip_address)
while True:
SOCK.send(ack_packet)
sleep(0.01)
def discover_sender():
SOCK = socket(AF_PACKET, SOCK_RAW)
SOCK.bind((current_network_interface, 0))
while True:
discover_packet = dhcp.make_discover_packet(source_mac=your_mac_address, client_mac=eth.get_random_mac())
SOCK.send(discover_packet)
sleep(0.01)
def _arp_loop(self):
try:
with contextlib.closing(
socket.socket(
socket.AF_PACKET, socket.SOCK_RAW,
socket.htons(ether.ETH_TYPE_ARP))) as packet_socket:
packet_socket.bind((self.interface.device_name,
socket.htons(ether.ETH_TYPE_ARP),
socket.PACKET_BROADCAST,
arp.ARP_HW_TYPE_ETHERNET,
mac_lib.BROADCAST))
self._arp_loop_socket(packet_socket)
except greenlet.GreenletExit:
# suppress thread.kill exception
pass
def _arp_loop(self):
try:
with contextlib.closing(
socket.socket(
socket.AF_PACKET, socket.SOCK_RAW,
socket.htons(ether.ETH_TYPE_ARP))) as packet_socket:
packet_socket.bind((self.interface.device_name,
socket.htons(ether.ETH_TYPE_ARP),
socket.PACKET_BROADCAST,
arp.ARP_HW_TYPE_ETHERNET,
mac_lib.BROADCAST))
self._arp_loop_socket(packet_socket)
except greenlet.GreenletExit:
# suppress thread.kill exception
pass
def test_net_if_addrs(self):
nics = psutil.net_if_addrs()
assert nics, nics
nic_stats = psutil.net_if_stats()
# Not reliable on all platforms (net_if_addrs() reports more
# interfaces).
# self.assertEqual(sorted(nics.keys()),
# sorted(psutil.net_io_counters(pernic=True).keys()))
families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK])
for nic, addrs in nics.items():
self.assertIsInstance(nic, (str, unicode))
self.assertEqual(len(set(addrs)), len(addrs))
for addr in addrs:
self.assertIsInstance(addr.family, int)
self.assertIsInstance(addr.address, str)
self.assertIsInstance(addr.netmask, (str, type(None)))
self.assertIsInstance(addr.broadcast, (str, type(None)))
self.assertIn(addr.family, families)
if sys.version_info >= (3, 4):
self.assertIsInstance(addr.family, enum.IntEnum)
if nic_stats[nic].isup:
# Do not test binding to addresses of interfaces
# that are down
if addr.family == socket.AF_INET:
s = socket.socket(addr.family)
with contextlib.closing(s):
s.bind((addr.address, 0))
elif addr.family == socket.AF_INET6:
info = socket.getaddrinfo(
addr.address, 0, socket.AF_INET6,
socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0]
af, socktype, proto, canonname, sa = info
s = socket.socket(af, socktype, proto)
with contextlib.closing(s):
s.bind(sa)
for ip in (addr.address, addr.netmask, addr.broadcast,
addr.ptp):
if ip is not None:
# TODO: skip AF_INET6 for now because I get:
# AddressValueError: Only hex digits permitted in
# u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0'
if addr.family != AF_INET6:
check_net_address(ip, addr.family)
# broadcast and ptp addresses are mutually exclusive
if addr.broadcast:
self.assertIsNone(addr.ptp)
elif addr.ptp:
self.assertIsNone(addr.broadcast)
if BSD or OSX or SUNOS:
if hasattr(socket, "AF_LINK"):
self.assertEqual(psutil.AF_LINK, socket.AF_LINK)
elif LINUX:
self.assertEqual(psutil.AF_LINK, socket.AF_PACKET)
elif WINDOWS:
self.assertEqual(psutil.AF_LINK, -1)
def test_net_if_addrs(self):
nics = psutil.net_if_addrs()
assert nics, nics
nic_stats = psutil.net_if_stats()
# Not reliable on all platforms (net_if_addrs() reports more
# interfaces).
# self.assertEqual(sorted(nics.keys()),
# sorted(psutil.net_io_counters(pernic=True).keys()))
families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK])
for nic, addrs in nics.items():
self.assertIsInstance(nic, (str, unicode))
self.assertEqual(len(set(addrs)), len(addrs))
for addr in addrs:
self.assertIsInstance(addr.family, int)
self.assertIsInstance(addr.address, str)
self.assertIsInstance(addr.netmask, (str, type(None)))
self.assertIsInstance(addr.broadcast, (str, type(None)))
self.assertIn(addr.family, families)
if sys.version_info >= (3, 4):
self.assertIsInstance(addr.family, enum.IntEnum)
if nic_stats[nic].isup:
# Do not test binding to addresses of interfaces
# that are down
if addr.family == socket.AF_INET:
s = socket.socket(addr.family)
with contextlib.closing(s):
s.bind((addr.address, 0))
elif addr.family == socket.AF_INET6:
info = socket.getaddrinfo(
addr.address, 0, socket.AF_INET6,
socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0]
af, socktype, proto, canonname, sa = info
s = socket.socket(af, socktype, proto)
with contextlib.closing(s):
s.bind(sa)
for ip in (addr.address, addr.netmask, addr.broadcast,
addr.ptp):
if ip is not None:
# TODO: skip AF_INET6 for now because I get:
# AddressValueError: Only hex digits permitted in
# u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0'
if addr.family != AF_INET6:
check_net_address(ip, addr.family)
# broadcast and ptp addresses are mutually exclusive
if addr.broadcast:
self.assertIsNone(addr.ptp)
elif addr.ptp:
self.assertIsNone(addr.broadcast)
if BSD or OSX or SUNOS:
if hasattr(socket, "AF_LINK"):
self.assertEqual(psutil.AF_LINK, socket.AF_LINK)
elif LINUX:
self.assertEqual(psutil.AF_LINK, socket.AF_PACKET)
elif WINDOWS:
self.assertEqual(psutil.AF_LINK, -1)
def test_net_if_addrs(self):
nics = psutil.net_if_addrs()
assert nics, nics
nic_stats = psutil.net_if_stats()
# Not reliable on all platforms (net_if_addrs() reports more
# interfaces).
# self.assertEqual(sorted(nics.keys()),
# sorted(psutil.net_io_counters(pernic=True).keys()))
families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK])
for nic, addrs in nics.items():
self.assertEqual(len(set(addrs)), len(addrs))
for addr in addrs:
self.assertIsInstance(addr.family, int)
self.assertIsInstance(addr.address, str)
self.assertIsInstance(addr.netmask, (str, type(None)))
self.assertIsInstance(addr.broadcast, (str, type(None)))
self.assertIn(addr.family, families)
if sys.version_info >= (3, 4):
self.assertIsInstance(addr.family, enum.IntEnum)
if nic_stats[nic].isup:
# Do not test binding to addresses of interfaces
# that are down
if addr.family == socket.AF_INET:
s = socket.socket(addr.family)
with contextlib.closing(s):
s.bind((addr.address, 0))
elif addr.family == socket.AF_INET6:
info = socket.getaddrinfo(
addr.address, 0, socket.AF_INET6,
socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0]
af, socktype, proto, canonname, sa = info
s = socket.socket(af, socktype, proto)
with contextlib.closing(s):
s.bind(sa)
for ip in (addr.address, addr.netmask, addr.broadcast,
addr.ptp):
if ip is not None:
# TODO: skip AF_INET6 for now because I get:
# AddressValueError: Only hex digits permitted in
# u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0'
if addr.family != AF_INET6:
check_net_address(ip, addr.family)
# broadcast and ptp addresses are mutually exclusive
if addr.broadcast:
self.assertIsNone(addr.ptp)
elif addr.ptp:
self.assertIsNone(addr.broadcast)
if BSD or OSX or SUNOS:
if hasattr(socket, "AF_LINK"):
self.assertEqual(psutil.AF_LINK, socket.AF_LINK)
elif LINUX:
self.assertEqual(psutil.AF_LINK, socket.AF_PACKET)
elif WINDOWS:
self.assertEqual(psutil.AF_LINK, -1)