def active_scan(self, target):
req = 'M-SEARCH * HTTP/1.1\r\nHost:239.255.255.250:1900\r\nST:upnp:rootdevice\r\nMan:"ssdp:discover"\r\nMX:3\r\n\r\n'
ip=IP(dst=target)
udp=UDP(sport=random.randint(49152,65536), dport=1900)
pck = ip/udp/req
try:
start = time.time()
rep = sr1(pck, verbose=0,timeout=5)
if rep[Raw]:
results = rep[Raw].load
else:
pass
except Exception as e:
results = None
#print e
return results
python类IP的实例源码
def _send_to_target(self, data):
ether = Ether(dst='ff:ff:ff:ff:ff:ff')
ip = IP(src=self.host, dst='255.255.255.255')
udp = UDP(sport=68, dport=self.port)
payload = Raw(load=data)
packet = str(ether / ip / udp / payload)
self.logger.debug('Sending header+data to host: %s:%d' % (self.host, self.port))
self.socket.send(packet)
self.logger.debug('Header+data sent to host')
def cmd_dhcp_discover(iface, timeout, verbose):
conf.verb = False
if iface:
conf.iface = iface
conf.checkIPaddr = False
hw = get_if_raw_hwaddr(conf.iface)
ether = Ether(dst="ff:ff:ff:ff:ff:ff")
ip = IP(src="0.0.0.0",dst="255.255.255.255")
udp = UDP(sport=68,dport=67)
bootp = BOOTP(chaddr=hw)
dhcp = DHCP(options=[("message-type","discover"),"end"])
dhcp_discover = ether / ip / udp / bootp / dhcp
ans, unans = srp(dhcp_discover, multi=True, timeout=5) # Press CTRL-C after several seconds
for _, pkt in ans:
if verbose:
print(pkt.show())
else:
print(pkt.summary())
def callback(self, packet):
flags = packet.sprintf("%TCP.flags%")
proto = IP
if IPv6 in packet:
proto = IPv6
if flags == "A" and not self.ignore_packet(packet, proto):
src_mac = packet[Ether].src
dst_mac = packet[Ether].dst
src_ip = packet[proto].src
dst_ip = packet[proto].dst
src_port = packet[TCP].sport
dst_port = packet[TCP].dport
seq = packet[TCP].seq
ack = packet[TCP].ack
if self.verbose:
print("RST from %s:%s (%s) --> %s:%s (%s) w/ %s" % (src_ip, src_port, src_mac, dst_ip, dst_port, dst_mac, ack))
if self.noisy:
self.send(self.build_packet(src_mac, dst_mac, src_ip, dst_ip, src_port, dst_port, seq, proto))
self.send(self.build_packet(dst_mac, src_mac, dst_ip, src_ip, dst_port, src_port, ack, proto))
def monlist_scan(self,target):
data = "\x17\x00\x03\x2a" + "\x00" * 4
ip = IP(dst=target)
udp=UDP(sport=random.randint(49152,65536),dport=123)
a = Raw(load=data)
pck = ip/udp/a
n = 0
results = None
#try:
while (n < 3):
rep = sr1(pck,verbose=0,timeout=5)
if hasattr(rep,'answers'):
results = 1
break
elif not hasattr(rep,'answers') and (n < 3):
#print "Pass ",n
n = n + 1
else:
results = None
break
pass
#except KeyboardInterrupt:
# sys.exit(0)
#except Exception as e:
# results = None
#print e
return results
def test_mtu_regular(exp_src_mac, exp_dst_mac, port_interface_mapping, a, create_str):
fwd_pkt1 = Ether() / IP(dst='10.1.0.1') / TCP(sport=5793, dport=80)
fwd_pkt2 = Ether() / IP(dst='10.1.0.34') / TCP(sport=5793, dport=80)
fwd_pkt3 = Ether() / IP(dst='10.1.0.32') / TCP(sport=5793, dport=80) / Raw(create_str)
exp_pkt1 = (Ether(src=exp_src_mac, dst=exp_dst_mac) /
IP(dst='10.1.0.1', ttl=fwd_pkt1[IP].ttl-1) / TCP(sport=5793, dport=80))
exp_pkt2 = (Ether(src=exp_src_mac, dst=exp_dst_mac) /
IP(dst='10.1.0.34', ttl=fwd_pkt2[IP].ttl-1) / TCP(sport=5793, dport=80))
exp_pkt3 = (Ether(src=exp_src_mac, dst=exp_dst_mac) /
IP(dst='10.1.0.32', ttl=fwd_pkt3[IP].ttl-1) / TCP(sport=5793, dport=80) / Raw(create_str))
pack = send_pkts_and_capture(port_interface_mapping, [{'port': 0, 'packet': fwd_pkt1},
{'port': 1, 'packet': fwd_pkt2},
{'port': 1, 'packet': fwd_pkt3}])
input_ports = {0, 1}
output = create_port_seq_list([{'port': 2, 'packet': exp_pkt1},
{'port': 2, 'packet': exp_pkt2},
{'port': 3, 'packet': exp_pkt3}], pack, input_ports)
return output
def test_mtu_failing(exp_src_mac, exp_dst_mac, port_interface_mapping, a, create_str):
fwd_pkt1 = Ether() / IP(dst='10.1.0.1') / TCP(sport=5793, dport=80)
fwd_pkt2 = Ether() / IP(dst='10.1.0.34') / TCP(sport=5793, dport=80)
fwd_pkt3 = Ether() / IP(dst='10.1.0.32') / TCP(sport=5793, dport=80) / Raw(create_str)
exp_pkt1 = (Ether(src=exp_src_mac, dst=exp_dst_mac) /
IP(dst='10.1.0.1', ttl=fwd_pkt1[IP].ttl-1) / TCP(sport=5793, dport=80))
exp_pkt2 = (Ether(src=exp_src_mac, dst=exp_dst_mac) /
IP(dst='10.1.0.34', ttl=fwd_pkt2[IP].ttl-1) / TCP(sport=5793, dport=80))
exp_pkt3 = (Ether(src=exp_src_mac, dst=exp_dst_mac) /
IP(dst='10.1.0.32', ttl=fwd_pkt3[IP].ttl-1) / TCP(sport=5793, dport=80) / Raw(create_str))
pack = send_pkts_and_capture(port_interface_mapping, [{'port': 0, 'packet': fwd_pkt1},
{'port': 1, 'packet': fwd_pkt2},
{'port': 1, 'packet': fwd_pkt3}])
input_ports = {0, 1}
output = create_port_seq_list([{'port': 2, 'packet': exp_pkt1},
{'port': 2, 'packet': exp_pkt2},
{'port': 3, 'packet': exp_pkt3}], pack, input_ports)
return output
def test_multicast_sa_da(a, port_interface_mapping, exp_src_mac, exp_dst_mac):
fwd_pkt1 = Ether() / IP(src='10.1.0.3', dst='224.1.0.1') / TCP(sport=5793, dport=80)
fwd_pkt2 = Ether() / IP(src='10.1.0.5', dst='224.1.0.1') / TCP(sport=5793, dport=80)
exp_pkt1 = (Ether(src=exp_src_mac) /
IP(src='10.1.0.3', dst='224.1.0.1', ttl=fwd_pkt1[IP].ttl-1) / TCP(sport=5793, dport=80))
exp_pkt2 = (Ether(src=exp_src_mac) /
IP(src='10.1.0.5',dst='224.1.0.1', ttl=fwd_pkt2[IP].ttl-1) / TCP(sport=5793, dport=80))
pack = send_pkts_and_capture(port_interface_mapping, [{'port': 0, 'packet': fwd_pkt1},
{'port': 1, 'packet': fwd_pkt2}])
input_ports = {0, 1}
output = create_port_seq_list([{'port': 2, 'packet': exp_pkt1},
{'port': 3, 'packet': exp_pkt1},
{'port': 4, 'packet': exp_pkt2},
{'port': 5, 'packet': exp_pkt2},
{'port': 6, 'packet': exp_pkt2}], pack, input_ports)
return output
def test_multicast_rpf(a, port_interface_mapping, exp_src_mac, exp_dst_mac):
fwd_pkt1 = Ether() / IP(src='10.1.0.3', dst='224.1.0.1') / TCP(sport=5793, dport=80)
fwd_pkt2 = Ether() / IP(src='10.1.0.5', dst='224.1.0.1') / TCP(sport=5793, dport=80)
exp_pkt1 = (Ether(src=exp_src_mac) /
IP(src='10.1.0.3', dst='224.1.0.1', ttl=fwd_pkt1[IP].ttl-1) / TCP(sport=5793, dport=80))
exp_pkt2 = (Ether(src=exp_src_mac) /
IP(src='10.1.0.5',dst='224.1.0.1', ttl=fwd_pkt2[IP].ttl-1) / TCP(sport=5793, dport=80))
# The ports 1 nad 0 are exchanged to check that the rpf and ingress port are different ,
# thus dropping the packets
pack = send_pkts_and_capture(port_interface_mapping, [{'port': 1, 'packet': fwd_pkt1},
{'port': 0, 'packet': fwd_pkt2}])
input_ports = {0, 1}
output = create_port_seq_list([], pack, input_ports)
return output
def send_dhcp_over_qvb(self, port_id, port_mac):
"""Send DHCP Discovery over qvb device.
"""
qvb_device = utils.get_vif_name(constants.QVB_DEVICE_PREFIX, port_id)
ethernet = scapy.Ether(dst='ff:ff:ff:ff:ff:ff',
src=port_mac, type=0x800)
ip = scapy.IP(src='0.0.0.0', dst='255.255.255.255')
udp = scapy.UDP(sport=68, dport=67)
port_mac_t = tuple(map(lambda x: int(x, 16), port_mac.split(':')))
hw = struct.pack('6B', *port_mac_t)
bootp = scapy.BOOTP(chaddr=hw, flags=1)
dhcp = scapy.DHCP(options=[("message-type", "discover"), "end"])
packet = ethernet / ip / udp / bootp / dhcp
scapy.sendp(packet, iface=qvb_device)
def test_get_dhcp_mt(self):
dhcp = scapy.DHCP(options=[("message-type", "discover"), "end"])
pkt = scapy.Ether() / scapy.IP() / scapy.UDP() / scapy.BOOTP() / dhcp
message = self.scapy_dri.get_dhcp_mt(str(pkt))
self.assertIn(message, constants.DHCP_MESSATE_TYPE)
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
8_5_modify_ip_in_a_packet.py 文件源码
项目:Python-Network-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def send_packet(protocol=None, src_ip=None, src_port=None, flags=None, dst_ip=None, dst_port=None, iface=None):
"""Modify and send an IP packet."""
if protocol == 'tcp':
packet = IP(src=src_ip, dst=dst_ip)/TCP(flags=flags, sport=src_port, dport=dst_port)
elif protocol == 'udp':
if flags: raise Exception(" Flags are not supported for udp")
packet = IP(src=src_ip, dst=dst_ip)/UDP(sport=src_port, dport=dst_port)
else:
raise Exception("Unknown protocol %s" % protocol)
send(packet, iface=iface)
def cmd_traceroute(ip, port, iface):
conf.verb = False
if iface:
conf.iface = iface
pkts = IP(dst=ip, ttl=(1, 16)) / TCP(dport=port)
for pkt in pkts:
ans = sr1(pkt, timeout=1, iface=conf.iface)
if not ans:
print('.')
continue
print(ans.summary())
if TCP in ans and ans[TCP].flags == 18:
break
return True
def cmd_protoscan(ip, verbose):
conf.verb = False
#pkts = IP(dst=ip) / TCP(flags=(0, 255), dport=port)
pkts = IP(dst=ip, proto=(0, 255)) # / TCP(flags=(0, 255), dport=port)
out = "{:>8} -> {:<8}"
for pkt in pkts:
#if not flags or all(i in pkt.sprintf(r"%TCP.flags%") for i in flags):
print(pkt.show2())
ans = sr1(pkt, timeout=0.2)
if ans:
#if not rflags or all(i in ans.sprintf(r"%TCP.flags%") for i in rflags):
#print(out.format(pkt.sprintf(r"%TCP.flags%"), ans.sprintf(r"%TCP.flags%")))
print(ans.show())
return True
def cmd_ipscan(ip, iface, sleeptime, timeout, verbose):
if verbose:
logging.basicConfig(level=logging.INFO, format='%(message)s')
conf.verb = False
if iface:
conf.iface = iface
ans,unans=sr(IP(dst=ip, proto=(0,255))/"SCAPY",retry=0,timeout=2, verbose=True)
for s,r in ans:
print(r.summary())
if not ICMP in r:
r.show()
def cmd_snmp_crack(ip, port, stop, verbose):
FILEDIR = os.path.dirname(os.path.abspath(__file__))
DATADIR = os.path.abspath(os.path.join(FILEDIR, '../data'))
COMMFILE = Path(os.path.abspath(os.path.join(DATADIR, 'dict_snmp.txt')))
with COMMFILE.open() as cf:
communities = cf.read().split('\n')
conf.verb = False
pkt = IP(dst=ip)/UDP(sport=port, dport=port)/SNMP(community="public", PDU=SNMPget(varbindlist=[SNMPvarbind(oid=ASN1_OID("1.3.6.1"))]))
for community in communities:
if verbose:
print('.', end='')
sys.stdout.flush()
pkt[SNMP].community=community
ans = sr1(pkt, timeout=0.5, verbose=0)
if ans and UDP in ans:
print('\nCommunity found:', community)
if stop:
break
return True
def cmd_protoscan(ip, verbose):
conf.verb = False
#pkts = IP(dst=ip) / TCP(flags=(0, 255), dport=port)
pkts = IP(dst=ip, proto=(0, 255)) # / TCP(flags=(0, 255), dport=port)
out = "{:>8} -> {:<8}"
for pkt in pkts:
#if not flags or all(i in pkt.sprintf(r"%TCP.flags%") for i in flags):
print(pkt.show2())
ans = sr1(pkt, timeout=0.2)
if ans:
#if not rflags or all(i in ans.sprintf(r"%TCP.flags%") for i in rflags):
#print(out.format(pkt.sprintf(r"%TCP.flags%"), ans.sprintf(r"%TCP.flags%")))
print(ans.show())
return True
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt
def update_next_packet(self):
packet = s.sniff(count=1) # filter = "tcp.len > 0",
packet = packet[0]
if s.IP not in packet:
return
ip_send = packet[s.IP].src
ip_rec = packet[s.IP].dst
if ip_send != self.our_ip and ip_rec != self.our_ip:
return
print packet.summary()
if make_stamp(packet) is not None:
ip_send, ip_rec, protocol = make_stamp(packet)
three_tuple = [ip_send, ip_rec, protocol]
stamp = self.decide_stamp(three_tuple)
if self.sessions.get(stamp) is None:
threading.Thread(target=self.set_session, args=[packet, stamp, self.our_ip]).start()
else:
threading.Thread(target=self.sessions[stamp].update_session, args=[packet]).start()
# in case we done working on connection
# we will order by time and add to global list
if self.sessions[stamp].got_fin is True:
to_add = self.sessions.pop(stamp)
sorted(to_add.income, key=key_func)
sorted(to_add.outcome, key=key_func)
sorted(to_add.combined, key=key_func)
lst.add(to_add)
else:
print "[Error] - Shit happens"
def cap_session(pcap_path):
capture = s.rdpcap(pcap_path) # TODO when go live change to session capture
first = True
curr_session = None
session_info = [0, ] * 3
for pkt in capture:
if not pkt.haslayer(s.TCP) and not pkt.haslayer(s.IP) and pkt.len <= 0:
continue
if first:
first = False
if is_client(pkt):
session_info[0] = pkt[s.IP].src
session_info[1] = pkt[s.IP].dst
session_info[2] = "TCP"
curr_session = Session(pkt, session_info, session_info[0])
else:
session_info[0] = pkt[s.IP].dst
session_info[1] = pkt[s.IP].src
session_info[2] = "TCP"
curr_session = Session(pkt, session_info, session_info[0])
else:
curr_session.update_session(pkt)
return curr_session
def runTest(self):
pkt = scapy2.Ether(src="e4:1d:2d:a5:f3:ac", dst="00:02:03:04:05:00")
pkt /= scapy2.IP(src="10.0.0.1", dst="10.0.0.0")
# get L4 port number
port_number = testutils.test_params_get("port_number")
port = port_number["port_number"]
pkt /= scapy2.TCP(sport = int(port))
pkt /= ("badabadaboom")
# get packets number
count = testutils.test_params_get("count")
pack_number = count["count"]
# send packets
send(self, 0, pkt, int(pack_number))
def test_icmp(self):
def process_response(echo_reply, dest):
ans, unans = echo_reply
if ans:
log.msg("Received echo reply from %s: %s" % (dest, ans))
else:
log.msg("No reply was received from %s. Possible censorship event." % dest)
log.debug("Unanswered packets: %s" % unans)
self.report[dest] = echo_reply
for label, data in self.destinations.items():
reply = sr1(IP(dst=lebal) / ICMP())
process = process_reponse(reply, label)
#(ans, unans) = ping
#self.destinations[self.dst].update({'ans': ans,
# 'unans': unans,
# 'response_packet': ping})
#return ping
#return reply
def test_a_lookup(self):
question = IP(dst=self.resolverAddr) / \
UDP() / \
DNS(rd=1, qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname))
log.msg("Performing query to %s with %s:%s" %
(self.hostname, self.resolverAddr, self.resolverPort))
yield self.sr1(question)
def test_control_a_lookup(self):
question = IP(dst=self.controlResolverAddr) / \
UDP() / \
DNS(rd=1, qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname))
log.msg("Performing query to %s with %s:%s" %
(self.hostname, self.controlResolverAddr, self.controlResolverPort))
yield self.sr1(question)
def test_represent_scapy(self):
data = IP() / UDP()
yaml.dump_all([data], Dumper=OSafeDumper)
def test_send_packet_with_answer(self):
from scapy.all import IP, TCP
sender = txscapy.ScapySender()
self.scapy_factory.registerProtocol(sender)
packet_sent = IP(dst='8.8.8.8', src='127.0.0.1') / TCP(dport=53,
sport=5300)
packet_received = IP(dst='127.0.0.1', src='8.8.8.8') / TCP(sport=53,
dport=5300)
d = sender.startSending([packet_sent])
self.scapy_factory.super_socket.send.assert_called_with(packet_sent)
sender.packetReceived(packet_received)
result = yield d
assert result[0][0][0] == packet_sent
assert result[0][0][1] == packet_received
def build_icmp(self):
pkt = IP(src=self.gateway, dst=self.target)/ICMP(type=5, code=1, gw=self.ip_address) /\
IP(src=self.target, dst=self.gateway)/UDP()
return pkt