def ip_type(self, ip):
"""
Returns 4 if the given string is a valid IPv4 address and 6 if it's
a valid IPv6 address. Returns 0 if neither.
"""
try:
ipaddr.IPv4Address(ip)
return 4
except ipaddr.AddressValueError: pass
try:
ipaddr.IPv6Address(ip)
return 6
except ipaddr.AddressValueError: pass
return 0
#}}}
#{{{ Has private network
python类IPv4Address()的实例源码
def default(self, obj):
if isinstance(obj,
(ipaddress.IPv4Network, ipaddress.IPv6Network, ipaddress.IPv4Address, ipaddress.IPv6Address)):
return str(obj)
return json.JSONEncoder.default(self, obj)
def contains(self, ip_address):
ip = ipaddr.IPAddress(ip_address)
if isinstance(ip, ipaddr.IPv4Address):
networks = self.ipv4_networks
elif isinstance(ip, ipaddr.IPv6Address):
networks = self.ipv6_networks
else:
raise RuntimeError("Should never happen")
for network in networks:
if network.Contains(ip):
return True
return False
def in_private_ip_space(address):
ip_address = IPv4Address(address)
return any(
[ip_address.is_private, ip_address.is_loopback]
)
def is_private_address(address, only_loopback=False):
"""
Checks to see if an IP address is in private IP space and if the
hostname is either localhost or *.local.
:param address: an IP address of a hostname
:param only_loopback: will only check if the IP address is either
127.0.0.1/8 or ::1 in ipv6
:return: True if the IP address or host is in private space
"""
try:
ip_address = IPv4Address(address)
except AddressValueError:
try:
ip_address = IPv6Address(address)
except AddressValueError:
if address == "localhost":
return True
elif address.endswith(".local"):
return True
return False
candidates = [ip_address.is_loopback]
if not only_loopback:
candidates.append(ip_address.is_private)
return any(candidates)
def control_plane_arp_handler(self, in_port, vlan, eth_src, arp_pkt):
ofmsgs = []
if arp_pkt.opcode == arp.ARP_REQUEST:
pkt = self.build_ethernet_pkt(
eth_src, in_port, vlan, ether.ETH_TYPE_ARP)
arp_pkt = arp.arp(
opcode=arp.ARP_REPLY, src_mac=self.FAUCET_MAC,
src_ip=arp_pkt.dst_ip, dst_mac=eth_src, dst_ip=arp_pkt.src_ip)
pkt.add_protocol(arp_pkt)
pkt.serialize()
ofmsgs.append(self.valve_packetout(in_port, pkt.data))
self.logger.info('Responded to ARP request for %s from %s',
arp_pkt.src_ip, arp_pkt.dst_ip)
elif arp_pkt.opcode == arp.ARP_REPLY:
resolved_ip_gw = ipaddr.IPv4Address(arp_pkt.src_ip)
for ip_dst, ip_gw in vlan.ipv4_routes.iteritems():
if ip_gw == resolved_ip_gw:
self.logger.info('ARP response %s for %s',
eth_src, resolved_ip_gw)
ofmsgs.extend(
self.add_resolved_route(
ether.ETH_TYPE_IP, vlan, vlan.arp_cache,
ip_gw, ip_dst, eth_src))
return ofmsgs
def handle_control_plane(self, in_port, vlan, eth_src, eth_dst, pkt):
flowmods = []
if eth_dst == self.FAUCET_MAC or not mac_addr_is_unicast(eth_dst):
arp_pkt = pkt.get_protocol(arp.arp)
ipv4_pkt = pkt.get_protocol(ipv4.ipv4)
ipv6_pkt = pkt.get_protocol(ipv6.ipv6)
if arp_pkt is not None:
src_ip = ipaddr.IPv4Address(arp_pkt.src_ip)
dst_ip = ipaddr.IPv4Address(arp_pkt.dst_ip)
if (arp_pkt.opcode == arp.ARP_REQUEST and
self.to_faucet_ip(vlan, src_ip, dst_ip)):
flowmods.extend(self.control_plane_arp_handler(
in_port, vlan, eth_src, arp_pkt))
elif (arp_pkt.opcode == arp.ARP_REPLY and
eth_dst == self.FAUCET_MAC):
flowmods.extend(self.control_plane_arp_handler(
in_port, vlan, eth_src, arp_pkt))
elif ipv4_pkt is not None:
icmp_pkt = pkt.get_protocol(icmp.icmp)
if icmp_pkt is not None:
src_ip = ipaddr.IPv4Address(ipv4_pkt.src)
dst_ip = ipaddr.IPv4Address(ipv4_pkt.dst)
if self.to_faucet_ip(vlan, src_ip, dst_ip):
flowmods.extend(self.control_plane_icmp_handler(
in_port, vlan, eth_src, ipv4_pkt, icmp_pkt))
elif ipv6_pkt is not None:
icmpv6_pkt = pkt.get_protocol(icmpv6.icmpv6)
if icmpv6_pkt is not None:
src_ip = ipaddr.IPv6Address(ipv6_pkt.src)
dst_ip = ipaddr.IPv6Address(ipv6_pkt.dst)
if self.to_faucet_ip(vlan, src_ip, dst_ip):
flowmods.extend(self.control_plane_icmpv6_handler(
in_port, vlan, eth_src, ipv6_pkt, icmpv6_pkt))
return flowmods
def default(self, obj):
if isinstance(obj, (
ipaddress.IPv4Network, ipaddress.IPv6Network,
ipaddress.IPv4Address, ipaddress.IPv6Address
)):
return str(obj)
return json.JSONEncoder.default(self, obj)
def add_random_net(ip_ver, target_prefix_len):
def dump_vars():
print("target_prefix_len", target_prefix_len)
print("prefix_len", prefix_len)
print("max_prefix_len", max_prefix_len)
print("max_range_len", max_range_len)
print("max_range", max_range)
print("rand", rand)
print("net_id", net_id)
print("ip", ip)
print("net", net)
print("str(net.ip)", str(net.ip))
print("str(net.network)", str(net.network))
if ip_ver == 4:
prefix_len = random.randint(8, target_prefix_len-1)
max_range_len = prefix_len
max_prefix_len = 32
ip_class = ipaddr.IPv4Address
net_class = ipaddr.IPv4Network
else:
prefix_len = random.randint(19, target_prefix_len-1)
max_range_len = prefix_len - 1 # to avoid overflow
max_prefix_len = 64
ip_class = ipaddr.IPv6Address
net_class = ipaddr.IPv6Network
max_range = 2**max_range_len - 1
rand = random.randint(0, max_range)
net_id = rand << max_prefix_len - prefix_len
if ip_ver == 6:
net_id = net_id << 64
ip = ip_class(net_id)
net = net_class("{}/{}".format(str(ip), prefix_len))
try:
assert str(net.ip) == str(net.network)
except:
dump_vars()
raise
try:
usres_monitor.add_net(net)
except USRESMonitorException as e:
if "it was already in the db" in str(e):
return "dup", net
except:
dump_vars()
return "ok", net