def active_scan(self, target):
req = 'M-SEARCH * HTTP/1.1\r\nHost:239.255.255.250:1900\r\nST:upnp:rootdevice\r\nMan:"ssdp:discover"\r\nMX:3\r\n\r\n'
ip=IP(dst=target)
udp=UDP(sport=random.randint(49152,65536), dport=1900)
pck = ip/udp/req
try:
start = time.time()
rep = sr1(pck, verbose=0,timeout=5)
if rep[Raw]:
results = rep[Raw].load
else:
pass
except Exception as e:
results = None
#print e
return results
python类Raw()的实例源码
def _send_to_target(self, data):
ether = Ether(dst='ff:ff:ff:ff:ff:ff')
ip = IP(src=self.host, dst='255.255.255.255')
udp = UDP(sport=68, dport=self.port)
payload = Raw(load=data)
packet = str(ether / ip / udp / payload)
self.logger.debug('Sending header+data to host: %s:%d' % (self.host, self.port))
self.socket.send(packet)
self.logger.debug('Header+data sent to host')
def read_file(self, filename):
if not scapy_installed:
exit("Could not read pcap due to missing scapy")
self.params['ppid']=0
print "Opening pcap file %s" % filename
packets=rdpcap(filename)
for p in packets:
if scapy_sctp and SCTPChunkData in p:
msg=p.data
elif (TCP in p and Raw in p) or UDP in p or (Ethernet in p and Raw in p):
msg = p.load
if p.time >= self.start and p.time <= self.stop:
self.packets.append(msg)
self.uniquesamples.add(msg)
ppid=getattr(p,'proto_id',0)
if self.params['ppid'] != ppid:
self.params['ppid'] = ppid
# This is not used so don't print
# if self.params['ppid'] != 0:
# print "Using PPID %d" % self.params['ppid']
def monlist_scan(self,target):
data = "\x17\x00\x03\x2a" + "\x00" * 4
ip = IP(dst=target)
udp=UDP(sport=random.randint(49152,65536),dport=123)
a = Raw(load=data)
pck = ip/udp/a
n = 0
results = None
#try:
while (n < 3):
rep = sr1(pck,verbose=0,timeout=5)
if hasattr(rep,'answers'):
results = 1
break
elif not hasattr(rep,'answers') and (n < 3):
#print "Pass ",n
n = n + 1
else:
results = None
break
pass
#except KeyboardInterrupt:
# sys.exit(0)
#except Exception as e:
# results = None
#print e
return results
def set_attribute(self, class_id, instance, attr, value):
"""Set the value of attribute class/instance/attr"""
path = CIP_Path.make(class_id=class_id, instance_id=instance)
# User CIP service 4: Set_Attribute_List
cippkt = CIP(service=4, path=path) / scapy_all.Raw(load=struct.pack('<HH', 1, attr) + value)
self.send_rr_cm_cip(cippkt)
if self.sock is None:
return
resppkt = self.recv_enippkt()
cippkt = resppkt[CIP]
if cippkt.status[0].status != 0:
logger.error("CIP set attribute error: %r", cippkt.status[0])
return False
return True
def _send_to_target(self, data):
ether = Ether(dst='ff:ff:ff:ff:ff:ff')
ip = IP(src=self.host, dst='255.255.255.255')
udp = UDP(sport=68, dport=self.port)
payload = Raw(load=data)
packet = str(ether / ip / udp / payload)
self.logger.debug('Sending header+data to host: %s:%d' % (self.host, self.port))
self.socket.send(packet)
self.logger.debug('Header+data sent to host')
def read_pcap(filename):
"""
@param filename: Filesystem path to the pcap.
Returns:
[{"client": "\x17\x52\x15"}, {"server": "\x17\x15\x13"}]
"""
from scapy.all import IP, Raw, rdpcap
packets = rdpcap(filename)
checking_first_packet = True
client_ip_addr = None
server_ip_addr = None
ssl_packets = []
messages = []
"""
pcap assumptions:
pcap only contains packets exchanged between a Tor client and a Tor
server. (This assumption makes sure that there are only two IP addresses
in the pcap file)
The first packet of the pcap is sent from the client to the server. (This
assumption is used to get the IP address of the client.)
All captured packets are TLS packets: that is TCP session
establishment/teardown packets should be filtered out (no SYN/SYN+ACK)
"""
"""
Minimally validate the pcap and also find out what's the client
and server IP addresses.
"""
for packet in packets:
if checking_first_packet:
client_ip_addr = packet[IP].src
checking_first_packet = False
else:
if packet[IP].src != client_ip_addr:
server_ip_addr = packet[IP].src
try:
if (packet[Raw]):
ssl_packets.append(packet)
except IndexError:
pass
"""Form our list."""
for packet in ssl_packets:
if packet[IP].src == client_ip_addr:
messages.append({"client": str(packet[Raw])})
elif packet[IP].src == server_ip_addr:
messages.append({"server": str(packet[Raw])})
else:
raise("Detected third IP address! pcap is corrupted.")
return messages