def active_scan(self, target):
req = 'M-SEARCH * HTTP/1.1\r\nHost:239.255.255.250:1900\r\nST:upnp:rootdevice\r\nMan:"ssdp:discover"\r\nMX:3\r\n\r\n'
ip=IP(dst=target)
udp=UDP(sport=random.randint(49152,65536), dport=1900)
pck = ip/udp/req
try:
start = time.time()
rep = sr1(pck, verbose=0,timeout=5)
if rep[Raw]:
results = rep[Raw].load
else:
pass
except Exception as e:
results = None
#print e
return results
python类UDP的实例源码
def _send_to_target(self, data):
ether = Ether(dst='ff:ff:ff:ff:ff:ff')
ip = IP(src=self.host, dst='255.255.255.255')
udp = UDP(sport=68, dport=self.port)
payload = Raw(load=data)
packet = str(ether / ip / udp / payload)
self.logger.debug('Sending header+data to host: %s:%d' % (self.host, self.port))
self.socket.send(packet)
self.logger.debug('Header+data sent to host')
def read_file(self, filename):
if not scapy_installed:
exit("Could not read pcap due to missing scapy")
self.params['ppid']=0
print "Opening pcap file %s" % filename
packets=rdpcap(filename)
for p in packets:
if scapy_sctp and SCTPChunkData in p:
msg=p.data
elif (TCP in p and Raw in p) or UDP in p or (Ethernet in p and Raw in p):
msg = p.load
if p.time >= self.start and p.time <= self.stop:
self.packets.append(msg)
self.uniquesamples.add(msg)
ppid=getattr(p,'proto_id',0)
if self.params['ppid'] != ppid:
self.params['ppid'] = ppid
# This is not used so don't print
# if self.params['ppid'] != 0:
# print "Using PPID %d" % self.params['ppid']
def cmd_dhcp_discover(iface, timeout, verbose):
conf.verb = False
if iface:
conf.iface = iface
conf.checkIPaddr = False
hw = get_if_raw_hwaddr(conf.iface)
ether = Ether(dst="ff:ff:ff:ff:ff:ff")
ip = IP(src="0.0.0.0",dst="255.255.255.255")
udp = UDP(sport=68,dport=67)
bootp = BOOTP(chaddr=hw)
dhcp = DHCP(options=[("message-type","discover"),"end"])
dhcp_discover = ether / ip / udp / bootp / dhcp
ans, unans = srp(dhcp_discover, multi=True, timeout=5) # Press CTRL-C after several seconds
for _, pkt in ans:
if verbose:
print(pkt.show())
else:
print(pkt.summary())
def monlist_scan(self,target):
data = "\x17\x00\x03\x2a" + "\x00" * 4
ip = IP(dst=target)
udp=UDP(sport=random.randint(49152,65536),dport=123)
a = Raw(load=data)
pck = ip/udp/a
n = 0
results = None
#try:
while (n < 3):
rep = sr1(pck,verbose=0,timeout=5)
if hasattr(rep,'answers'):
results = 1
break
elif not hasattr(rep,'answers') and (n < 3):
#print "Pass ",n
n = n + 1
else:
results = None
break
pass
#except KeyboardInterrupt:
# sys.exit(0)
#except Exception as e:
# results = None
#print e
return results
def postProcessor(self, measurements):
"""
This is not tested, but the concept is that if the two responses
match up then spoofing is occurring.
"""
try:
test_answer = self.report['answered_packets'][0][UDP]
control_answer = self.report['answered_packets'][1][UDP]
except IndexError:
self.report['spoofing'] = 'no_answer'
else:
if test_answer == control_answer:
self.report['spoofing'] = False
else:
self.report['spoofing'] = True
return self.report
def send_dhcp_over_qvb(self, port_id, port_mac):
"""Send DHCP Discovery over qvb device.
"""
qvb_device = utils.get_vif_name(constants.QVB_DEVICE_PREFIX, port_id)
ethernet = scapy.Ether(dst='ff:ff:ff:ff:ff:ff',
src=port_mac, type=0x800)
ip = scapy.IP(src='0.0.0.0', dst='255.255.255.255')
udp = scapy.UDP(sport=68, dport=67)
port_mac_t = tuple(map(lambda x: int(x, 16), port_mac.split(':')))
hw = struct.pack('6B', *port_mac_t)
bootp = scapy.BOOTP(chaddr=hw, flags=1)
dhcp = scapy.DHCP(options=[("message-type", "discover"), "end"])
packet = ethernet / ip / udp / bootp / dhcp
scapy.sendp(packet, iface=qvb_device)
def test_get_dhcp_mt(self):
dhcp = scapy.DHCP(options=[("message-type", "discover"), "end"])
pkt = scapy.Ether() / scapy.IP() / scapy.UDP() / scapy.BOOTP() / dhcp
message = self.scapy_dri.get_dhcp_mt(str(pkt))
self.assertIn(message, constants.DHCP_MESSATE_TYPE)
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
8_5_modify_ip_in_a_packet.py 文件源码
项目:Python-Network-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def send_packet(protocol=None, src_ip=None, src_port=None, flags=None, dst_ip=None, dst_port=None, iface=None):
"""Modify and send an IP packet."""
if protocol == 'tcp':
packet = IP(src=src_ip, dst=dst_ip)/TCP(flags=flags, sport=src_port, dport=dst_port)
elif protocol == 'udp':
if flags: raise Exception(" Flags are not supported for udp")
packet = IP(src=src_ip, dst=dst_ip)/UDP(sport=src_port, dport=dst_port)
else:
raise Exception("Unknown protocol %s" % protocol)
send(packet, iface=iface)
def cmd_dhcp_starvation(iface, timeout, sleeptime, verbose):
conf.verb = False
if iface:
conf.iface = iface
conf.checkIPaddr = False
ether = Ether(dst="ff:ff:ff:ff:ff:ff")
ip = IP(src="0.0.0.0",dst="255.255.255.255")
udp = UDP(sport=68, dport=67)
dhcp = DHCP(options=[("message-type","discover"),"end"])
while True:
bootp = BOOTP(chaddr=str(RandMAC()))
dhcp_discover = ether / ip / udp / bootp / dhcp
ans, unans = srp(dhcp_discover, timeout=1) # Press CTRL-C after several seconds
for _, pkt in ans:
if verbose:
print(pkt.show())
else:
print(pkt.sprintf(r"%IP.src% offers %BOOTP.yiaddr%"))
sleep(sleeptime)
def cmd_snmp_crack(ip, port, stop, verbose):
FILEDIR = os.path.dirname(os.path.abspath(__file__))
DATADIR = os.path.abspath(os.path.join(FILEDIR, '../data'))
COMMFILE = Path(os.path.abspath(os.path.join(DATADIR, 'dict_snmp.txt')))
with COMMFILE.open() as cf:
communities = cf.read().split('\n')
conf.verb = False
pkt = IP(dst=ip)/UDP(sport=port, dport=port)/SNMP(community="public", PDU=SNMPget(varbindlist=[SNMPvarbind(oid=ASN1_OID("1.3.6.1"))]))
for community in communities:
if verbose:
print('.', end='')
sys.stdout.flush()
pkt[SNMP].community=community
ans = sr1(pkt, timeout=0.5, verbose=0)
if ans and UDP in ans:
print('\nCommunity found:', community)
if stop:
break
return True
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
def test_a_lookup(self):
question = IP(dst=self.resolverAddr) / \
UDP() / \
DNS(rd=1, qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname))
log.msg("Performing query to %s with %s:%s" %
(self.hostname, self.resolverAddr, self.resolverPort))
yield self.sr1(question)
def test_control_a_lookup(self):
question = IP(dst=self.controlResolverAddr) / \
UDP() / \
DNS(rd=1, qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname))
log.msg("Performing query to %s with %s:%s" %
(self.hostname, self.controlResolverAddr, self.controlResolverPort))
yield self.sr1(question)
def test_represent_scapy(self):
data = IP() / UDP()
yaml.dump_all([data], Dumper=OSafeDumper)
def UDPTraceroute(self, host):
if host not in self.hosts:
self.hosts.append(host)
d = defer.Deferred()
reactor.callLater(self.timeout, d.callback, self)
for dst_port in self.dst_ports:
self.sendPackets(
IP(dst=host, ttl=(self.ttl_min, self.ttl_max), id=RandShort()) / UDP(dport=dst_port, sport=RandShort()))
return d
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
def _send_to_target(self, data):
ether = Ether(dst='ff:ff:ff:ff:ff:ff')
ip = IP(src=self.host, dst='255.255.255.255')
udp = UDP(sport=68, dport=self.port)
payload = Raw(load=data)
packet = str(ether / ip / udp / payload)
self.logger.debug('Sending header+data to host: %s:%d' % (self.host, self.port))
self.socket.send(packet)
self.logger.debug('Header+data sent to host')
def add_eth_ip_udp_headers(dport):
eth = Ether(src='0C:C4:7A:A3:25:34', dst='0C:C4:7A:A3:25:35')
ip = IP(dst='10.0.0.2', ttl=64)
udp = UDP(sport=65231, dport=dport)
pkt = eth / ip / udp
return pkt
09_05_modify_ip_in_a_packet.py 文件源码
项目:011_python_network_programming_cookbook_demo
作者: jerry-0824
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def send_packet(protocol=None, src_ip=None, src_port=None, flags=None, dst_ip=None, dst_port=None, iface=None):
""" Modify and sned an IP packet. """
if protocol == 'tcp':
packet = IP(src=src_ip, dst=dst_ip)/TCP(flags=flags, sport=src_port, dport=dst_port)
elif protocol == 'udp':
if flags: raise Exception(" Flags are not suppored for udp")
packet = IP(src=src_ip, dst=dst_ip)/UDP(sport=src_port, dport=dst_port)
else:
raise Exception("Unknown protocol %s" % protocol)
send(packet, iface=iface)
def traceroute_dns_servers(hosts, fqdn):
"""
Run UDP traceroutes to the given DNS servers, using FQDN in DNS requests.
"""
log.info("Running UDP traceroutes to %d servers." % len(hosts))
addrs = [host.addr for host in hosts]
udp_datagram = scapy.UDP(sport=scapy.RandShort())
dns_msg = scapy.DNS(qd=scapy.DNSQR(qname=fqdn))
return scapy.traceroute(addrs, l4=udp_datagram/dns_msg, verbose=0)
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
def make_stamp(pkt):
if s.IP in pkt:
ip_send = pkt[s.IP].src
ip_rec = pkt[s.IP].dst
else:
return None
if s.TCP in pkt:
# port_send = pkt[TCP].sport
# port_rec = pkt[TCP].dport
protocol = "TCP"
elif s.UDP in pkt:
# port_send = pkt[UDP].sport
# port_rec = pkt[UDP].dport
protocol = "UDP"
elif s.ICMP in pkt:
# port_send = 1 # pkt[ICMP].sport
# port_rec = 1 # pkt[ICMP].dport
protocol = "ICMP"
else:
return None # if not TCP or UDP or ICMP
return ip_send, ip_rec, protocol
def send(self, packet):
original_packet = IP(dst=self.target_ip,src=self.return_ip)/UDP(dport=self.target_port,sport=self.return_port)/packet
if self.verbose > 1:
print "Original packet:"
original_packet.show()
hexdump(str(original_packet))
fragments = fragment(original_packet, fragsize = self.fragment_size)
try:
i = 1
for frag in fragments:
if self.verbose > 1:
print "Fragment %d of %d:" % (i, len(fragments))
frag.show()
frag = str(frag)
length = struct.pack(">I", len(frag))
if not self.sock:
print '[+] connecting ...'
self.sock = self.create()
print '[+] sending part %d of %d now..' % (i, len(fragments))
hexdump(frag)
if self.log:
self.log.packet('sending fragment %d of %d' % (i, len(fragments)), frag)
self.sock.send(length)
self.sock.send(frag)
if self.log:
self.log('sent fragment %d of %d' % (i, len(fragments)))
i += 1
if self.raw_send:
if self.log:
self.log('forcing a new connection due to raw_send flag')
self.close()
except KeyboardInterrupt,e:
print "[-] keyboard interrupt while connecting/sending to redirector"
raise KeyboardInterrupt,e
except socket.timeout,e:
print "[-] timeout while connecting/sending to redirector"
raise socket.timeout,e
finally:
self.close()