def discover(data: ConnectionData) -> None:
assert isinstance(data, ConnectionData)
ip_net, iface = data
try:
ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip_net), iface=iface, timeout=2, verbose=False)
for s, r in ans:
line = r.sprintf("%Ether.src% %ARP.psrc%")
try:
hostname = socket.gethostbyaddr(r.psrc)
line += ' ' + hostname[0]
except socket.herror:
pass
print(line)
except PermissionError:
print('Cannot execute necessary code, did you run as root?')
sys.exit(1)
except:
raise
python类srp()的实例源码
discovery.py 文件源码
项目:SupercomputerInABriefcase
作者: SupercomputerInABriefcase
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def cmd_dhcp_discover(iface, timeout, verbose):
conf.verb = False
if iface:
conf.iface = iface
conf.checkIPaddr = False
hw = get_if_raw_hwaddr(conf.iface)
ether = Ether(dst="ff:ff:ff:ff:ff:ff")
ip = IP(src="0.0.0.0",dst="255.255.255.255")
udp = UDP(sport=68,dport=67)
bootp = BOOTP(chaddr=hw)
dhcp = DHCP(options=[("message-type","discover"),"end"])
dhcp_discover = ether / ip / udp / bootp / dhcp
ans, unans = srp(dhcp_discover, multi=True, timeout=5) # Press CTRL-C after several seconds
for _, pkt in ans:
if verbose:
print(pkt.show())
else:
print(pkt.summary())
def mac_getter(self, IP):
# Sending ARP for take the MAC address
ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=IP), timeout=2, iface=self.interface, inter=0.2)
for send, receive in ans:
return receive.sprintf(r"%Ether.src%")
def cmd_dhcp_starvation(iface, timeout, sleeptime, verbose):
conf.verb = False
if iface:
conf.iface = iface
conf.checkIPaddr = False
ether = Ether(dst="ff:ff:ff:ff:ff:ff")
ip = IP(src="0.0.0.0",dst="255.255.255.255")
udp = UDP(sport=68, dport=67)
dhcp = DHCP(options=[("message-type","discover"),"end"])
while True:
bootp = BOOTP(chaddr=str(RandMAC()))
dhcp_discover = ether / ip / udp / bootp / dhcp
ans, unans = srp(dhcp_discover, timeout=1) # Press CTRL-C after several seconds
for _, pkt in ans:
if verbose:
print(pkt.show())
else:
print(pkt.sprintf(r"%IP.src% offers %BOOTP.yiaddr%"))
sleep(sleeptime)
def get_mac(iface, ip):
gw_ip = ""
gws = gateways()
for gw in gws.keys():
try:
if str(gws[gw][AF_INET][1]) == iface:
gw_ip = str(gws[gw][AF_INET][0])
except IndexError:
if str(gws[gw][0][1]) == iface:
gw_ip = str(gws[gw][0][0])
try:
alive, dead = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip), iface=iface, timeout=10, verbose=0)
return str(alive[0][1].hwsrc)
except IndexError:
try:
alive, dead = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=gw_ip), iface=iface, timeout=10, verbose=0)
return str(alive[0][1].hwsrc)
except:
return "ff:ff:ff:ff:ff:ff"
except:
return "ff:ff:ff:ff:ff:ff"
def _request_arp(self, ip):
# README: requested arp not write system arp cache
#
# disable scapy module verbose
verb_conf = conf.verb
conf.verb = 0
# Run request arp up to three times.
mac_addr = ""
for i in xrange(3):
ans, uans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip), timeout=1)
for snd, rcv in ans:
result = rcv.sprintf(r"%ARP.psrc% %Ether.src%").split()
mac_addr = result[1]
if mac_addr != None and mac_addr != "":
break
# rollback scapy module verbose
conf.verb = verb_conf
if mac_addr == "":
return None
return mac_addr
def scan():
try:
print(colors.blue+"interfaces:"+colors.end)
for iface in netifaces.interfaces():
print(colors.yellow+iface+colors.end)
print("")
interface = input(colors.purple+"interface: "+colors.end)
try:
ip = netifaces.ifaddresses(interface)[2][0]['addr']
except(ValueError, KeyError):
printError("invalid interface")
return
ips = ip+"/24"
printInfo("scanning please wait...\n", start="\n")
print(colors.blue+"MAC - IP"+colors.end)
start_time = datetime.now()
conf.verb = 0
try:
ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst = ips), timeout = 2,iface=interface,inter=0.1)
except PermissionError:
printError('root permissions required')
return
for snd,rcv in ans:
print(rcv.sprintf(colors.yellow+"r%Ether.src% - %ARP.psrc%"+colors.end))
stop_time = datetime.now()
total_time = stop_time - start_time
printSuccess("scan completed", start="\n")
printSuccess("scan duration: "+str(total_time))
except KeyboardInterrupt:
printInfo("network scanner terminated", start="\n")
def is_device_connected(mac_addr):
answer, _ = scapy.srp(scapy.Ether(dst="ff:ff:ff:ff:ff:ff") / scapy.ARP(pdst=SUBNET), timeout=2)
return mac_addr in (rcv.src for _, rcv in answer)
def get_mac_address(ip_address):
response, unanswered = srp(Ether(dst='ff:ff:ff:ff:ff:ff')/ARP(pdst=ip_address), \
timeout=2, retry=2)
for s, r in response:
return r[Ether].src
return None
logging.info('Gateway Layer 2 address is: %s' % r[Ether].src)
GATEWAY_MAC = "%s" % r[Ether].src
def get_mac_address_v6(ip_address):
response, unanswered = srp(Ether(dst='33:33:00:00:00:02')/IPv6(dst="FF02::2")/ICMPv6ND_RS(code = 133), \
timeout = 2, retry = 2)
for s, r in response:
return r[Ether].src
return None
logging.info('Gateway Layer 2 address is: %s' % r[Ether].src)
GATEWAY_MAC = "%s" % r[Ether].src
def arp_network_range(iprange):
logging.info('Sending ARPs to network range')
ans, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=iprange), timeout=5)
ip_collection = []
eth_collection = []
for snd, rcv in ans:
host_ip_address = rcv.sprintf(r"%ARP.psrc%")
host_eth_address = rcv.sprintf(r"%Ether.src%")
logging.info('%s' % host_ip_address)
logging.info('%s' % host_eth_address)
ip_collection.append(host_ip_address)
eth_collection.append(host_eth_address)
print "Host List IP Addresses:"
for host_ip in ip_collection:
print host_ip
print "Host List Ethernet Addresses:"
for host_eth in eth_collection:
print host_eth
with open("toucan_hosts.txt", "w") as output:
output.write(str(ip_collection))
output.write(str(eth_collection))
def cmd_arping(ip, iface, verbose):
if verbose:
logging.basicConfig(level=logging.INFO, format='%(message)s')
conf.verb = False
if iface:
conf.iface = iface
res, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip), timeout=2)
res.show()