def sniff_aps(int_mon):
# Print the program header
print("-=-=-=-=-=-= AIROSCAPY =-=-=-=-=-=-")
print("CH ENC BSSID SSID")
# Start the channel hopper
global channel_hop
channel_hop = True
pros = Process(target=channel_hopper)
pros.start()
# Capture CTRL-C
#signal.signal(signal.SIGINT, sniff_aps_signal_handler)
# Start the sniffer
sniff(iface=int_mon, prn=sniff_aps_callback)
sniff_aps_signal_handler()
python类sniff()的实例源码
def listen():
app_exfiltrate.log_message('info', "[icmp] Listening for ICMP packets..")
# Filter for echo requests only to prevent capturing generated replies
scapy.sniff(filter="icmp and icmp[0]=8", prn=analyze)
def update_next_packet(self):
packet = s.sniff(count=1) # filter = "tcp.len > 0",
packet = packet[0]
if s.IP not in packet:
return
ip_send = packet[s.IP].src
ip_rec = packet[s.IP].dst
if ip_send != self.our_ip and ip_rec != self.our_ip:
return
print packet.summary()
if make_stamp(packet) is not None:
ip_send, ip_rec, protocol = make_stamp(packet)
three_tuple = [ip_send, ip_rec, protocol]
stamp = self.decide_stamp(three_tuple)
if self.sessions.get(stamp) is None:
threading.Thread(target=self.set_session, args=[packet, stamp, self.our_ip]).start()
else:
threading.Thread(target=self.sessions[stamp].update_session, args=[packet]).start()
# in case we done working on connection
# we will order by time and add to global list
if self.sessions[stamp].got_fin is True:
to_add = self.sessions.pop(stamp)
sorted(to_add.income, key=key_func)
sorted(to_add.outcome, key=key_func)
sorted(to_add.combined, key=key_func)
lst.add(to_add)
else:
print "[Error] - Shit happens"
def run(self):
# TODO this loop could be reconciled with the ofp Connection to become a
# single select loop.
self.sock = s = conf.L2listen(
type=ETH_P_ALL,
iface=self.iface,
filter='inbound'
)
while not self.finished:
try:
sniffed = sniff(1, iface=self.iface, timeout=1, opened_socket=s)
print 'Sniffer received %d packet(s)' % len(sniffed)
for pkt in sniffed:
self.forward_packet(pkt)
except Exception, e:
logging.error("scapy.sniff error: %s" % e)
def main():
sn = Sniffer()
pkt = sniff(prn=sn.pkt_handle)
def StartSF():
conf.verb = 0
return sniff(iface=ingress, filter='ip dst ' + get_ip_address(ingress) + ' and (udp port 6633)',
prn=lambda x: NSHForward(x))
def capture(mon_interface):
global flag_test
"""
:type if_mon: string, name of the interface operating in monitor mode
"""
try:
scapy.conf.iface=mon_interface
scapy.sniff(iface=mon_interface, prn=on_receiving, store=0, filter="subtype probereq")
except Exception, info:
sys.stderr.write("\nError: " + str(info) + "\n")
quit(0)
def StartSF():
conf.verb = 0
return sniff(iface=ingress, filter='ip dst ' + get_ip_address(ingress) + ' and (udp port 6633)',
prn=lambda x: NSHForward(x))
def __init__(self, config):
super(CredentialSniffer, self).__init__(config, "credentialsniffer")
self.running_interface = self.config["sniffing_interface"]
self.running_bssid = self.config["bssid"]
self.running_ssid = self.config["ssid"]
self.log_dir = self.config["log_dir"]
self.wifi_clients = {}
self.wpa_handshakes = {}
self.broadcasted_bssids = {} # bssid: beacon_packet
self.sniffer_thread = None
self.should_stop = False
self.log_lock = Lock()
try:
self.fixed_channel = int(self.config["fixed_sniffing_channel"])
except:
self.fixed_channel = 7
try:
self.timeout = int(self.config["timeout"])
except:
self.timeout = 30
# When sniffing for credentials on interface running in Master mode
# scapy will only be able to sniff for layer 3 packets (Networking)
# so it never receives a Beacon packet (layer2) to verify the access point ssid
# best to pass it as parameter since we are running the access point we know the ssid
self.is_ap = False
# This will be called by the AirSniffer
def start_credential_sniffing(self):
# TODO map packets to interface with threads
try:
sniff( store = 0,
prn = self.extract_credential_info,
stop_filter = self._stop)
except Exception as e:
print "Error Occurred while sniffing."
print str(e)
def scapy_mon_arp():
"""Monitor ARP table on all interfaces."""
mon_data = sniff(prn=scapy_mon_arp_callback, filter="arp", store=0)
return mon_data
def sniff_wifi_ssid_mac():
"""Scan for WiFi's and return SSID and ESSID."""
sniff(count=0, prn=sniff_wifi_ssid_mac_callback, store=0)
def sniff_wifi_ssid_mac_vendor():
"""Scan for WiFi's and return SSID and ESSID and Vendor."""
sniff(count=0, prn=sniff_wifi_ssid_mac_callback, store=0)
def detect_deauth(fm):
"""Detect deauth attacks by monitoring the deauth frame (fm.subtype==12)."""
sniff(prn=detect_deauth_callback)
def Sniffer(self, interface):
self.sniffed_cnt = 0
scapy2.sniff(iface="eth2", prn=self.pkt_callback, store=0, timeout=3)
def listen():
app_exfiltrate.log_message('info', "[icmp] Listening for ICMP packets..")
scapy.sniff(filter="icmp", prn=analyze)
def runTest(self):
pkt = scapy2.Ether()
pkt /= scapy2.IP(src="21.0.0.2", dst="22.0.0.2")
pkt /= scapy2.TCP(dport = 80, flags="S", seq=42)
pkt /= ("badabadaboom")
t = Thread(target=self.Sniffer, args=("eth2",))
t.start()
scapy2.sendp(pkt, iface='eth2')
sleep(4)
# fail if no reply
if self.sniffed_cnt == 0:
self.assertTrue(False)
#res = scapy2.sniff(iface="eth2", timeout=3)
#print res
#if res:
# raise
#if reply:
# raise
#print "================______====\n"
#print reply
#print error
#print "================______====\n"
#if reply:
# reply.show()
#(rcv_device, rcv_port, rcv_pkt, pkt_time) = dp_poll(self, device_number=0, port_number=0, timeout=5)
#send_packet(self, 0, pkt)
#(rcv_device, rcv_port, rcv_pkt, pkt_time) = dp_poll(self, device_number=0, port_number=0, timeout=5)
# verify_packet(self, masked_exp_pkt, 1)
#mpkt = Mask(pkt)
#mpkt.set_do_not_care(0, 14*8)
#mpkt.set_do_not_care(16*8, 49*8)
#verify_packet(self, mpkt, 0)
#(rcv_device, rcv_port, rcv_pkt, pkt_time) = dp_poll(self, device_number=0, port_number=0, timeout=5)
#print "================______====\n"
#y = 0
#for x in rcv_pkt:
# print "%d - %X" % (y, ord(x))
# y +=1