def _ValidateEntry(self, entry):
if not entry.subnet:
return MISSING_SUBNET
try:
ipaddr.IPNetwork(entry.subnet)
except ValueError:
return BAD_IPV_SUBNET % entry.subnet
parts = entry.subnet.split('/')
if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
return BAD_PREFIX_LENGTH % entry.subnet
python类IPNetwork()的实例源码
def __init__(self, ip):
if ip_library == "ipaddr":
self.obj = ipaddr.IPNetwork(ip)
self.ip = str(self.obj.ip)
else:
self.obj = ipaddress.ip_network(ip)
self.ip = str(self.obj.network_address)
self.prefixlen = self.obj.prefixlen
self.max_prefixlen = self.obj.max_prefixlen
self.version = self.obj.version
def is_valid_url(url):
"""
Tests a URL to ensure it doesn't appear to be a blacklisted IP range.
"""
parsed = urlparse(url)
if not parsed.hostname:
return False
server_hostname = get_server_hostname()
if parsed.hostname == server_hostname:
return True
try:
ip_address = socket.gethostbyname(parsed.hostname)
except socket.gaierror:
return False
if ip_address == server_hostname:
return True
ip_network = IPNetwork(ip_address)
for addr in DISALLOWED_IPS:
if ip_network in addr:
return False
return True
def __init__(self, vid, conf=None):
if conf is None:
conf = {}
self.vid = vid
self.tagged = []
self.untagged = []
self.name = conf.setdefault('name', str(vid))
self.description = conf.setdefault('description', self.name)
self.controller_ips = conf.setdefault('controller_ips', [])
if self.controller_ips:
self.controller_ips = [
ipaddr.IPNetwork(ip) for ip in self.controller_ips]
self.unicast_flood = conf.setdefault('unicast_flood', True)
self.routes = conf.setdefault('routes', {})
self.ipv4_routes = {}
self.ipv6_routes = {}
if self.routes:
self.routes = [route['route'] for route in self.routes]
for route in self.routes:
ip_gw = ipaddr.IPAddress(route['ip_gw'])
ip_dst = ipaddr.IPNetwork(route['ip_dst'])
assert(ip_gw.version == ip_dst.version)
if ip_gw.version == 4:
self.ipv4_routes[ip_dst] = ip_gw
else:
self.ipv6_routes[ip_dst] = ip_gw
self.arp_cache = {}
self.nd_cache = {}
self.max_hosts = conf.setdefault('max_hosts', None)
self.host_cache = {}
def get(self):
esstore = current_app.extensions['es_access_store']
mpstore = current_app.extensions['mp_access_store']
args = self.parser.parse_args()
event = args['event'][:120]
auth_token=request.headers.get('Auth-Token')
ip_resolver = current_app.config['IP_RESOLVER']
ip = RateLimiter.get_remote_addr()
ip_net = IPNetwork(ip)
resolved_org = ip_resolver['default']
for net, org in ip_resolver.items():
if isinstance(net, (IPv4Network, IPv6Network)):
if net.overlaps(ip_net):
resolved_org = org
break
data = dict(ip=ip,
org=resolved_org,
host=request.host,
timestamp=datetime.now(),
event=event)
if auth_token:
payload = TokenAuthentication.get_payload_from_token(auth_token)
data['app_name'] = payload['app_name']
# esstore.store_event(data)
mpstore.store_event(data)
data['timestamp']= str(data['timestamp'])
return CTTVResponse.OK(SimpleResult(None, data=data))
def parse_graph(self):
"""
Parse the xml graph file
"""
deviceinfo = {}
deviceroot = self.root.find(self.pngtag).find('Devices')
devices = deviceroot.findall('Device')
if devices is not None:
for dev in devices:
hostname = dev.attrib['Hostname']
if hostname is not None:
deviceinfo[hostname] = {}
hwsku = dev.attrib['HwSku']
devtype = dev.attrib['Type']
deviceinfo[hostname]['HwSku'] = hwsku
deviceinfo[hostname]['Type'] = devtype
self.links[hostname] = {}
devicel2info = {}
devicel3s = self.root.find(self.dpgtag).findall('DevicesL3Info')
devicel2s = self.root.find(self.dpgtag).findall('DevicesL2Info')
if devicel2s is not None:
for l2info in devicel2s:
hostname = l2info.attrib['Hostname']
if hostname is not None:
devicel2info[hostname] = {}
vlans = l2info.findall('InterfaceVlan')
for vlan in vlans:
portname = vlan.attrib['portname']
portmode = vlan.attrib['mode']
portvlanid = vlan.attrib['vlanids']
portvlanlist = self.port_vlanlist(portvlanid)
devicel2info[hostname][portname] = {'mode': portmode, 'vlanids': portvlanid, 'vlanlist': portvlanlist}
if devicel3s is not None:
for l3info in devicel3s:
hostname = l3info.attrib['Hostname']
if hostname is not None:
management_ip = l3info.find('ManagementIPInterface').attrib['Prefix']
deviceinfo[hostname]['ManagementIp'] = management_ip
mgmtip = ipaddress.IPNetwork(management_ip)
deviceinfo[hostname]['mgmtip'] = str(mgmtip.ip)
management_gw = str(mgmtip.network+1)
deviceinfo[hostname]['ManagementGw'] = management_gw
allinks = self.root.find(self.pngtag).find('DeviceInterfaceLinks').findall('DeviceInterfaceLink')
if allinks is not None:
for link in allinks:
start_dev = link.attrib['StartDevice']
end_dev = link.attrib['EndDevice']
if start_dev:
self.links[start_dev][link.attrib['StartPort']] = {'peerdevice':link.attrib['EndDevice'], 'peerport': link.attrib['EndPort']}
if end_dev:
self.links[end_dev][link.attrib['EndPort']] = {'peerdevice': link.attrib['StartDevice'], 'peerport': link.attrib['StartPort']}
self.devices = deviceinfo
self.vlanport = devicel2info
def add_controller_ips(self, controller_ips, vlan):
ofmsgs = []
for controller_ip in controller_ips:
controller_ip_host = ipaddr.IPNetwork(
'/'.join((str(controller_ip.ip),
str(controller_ip.max_prefixlen))))
if controller_ip_host.version == 4:
ofmsgs.append(self.valve_flowcontroller(
self.dp.eth_src_table,
self.valve_in_match(
eth_type=ether.ETH_TYPE_ARP,
nw_dst=controller_ip_host,
vlan=vlan),
priority=self.dp.highest_priority))
ofmsgs.append(self.valve_flowcontroller(
self.dp.eth_src_table,
self.valve_in_match(
eth_type=ether.ETH_TYPE_IP,
eth_dst=self.FAUCET_MAC,
vlan=vlan,
nw_proto=inet.IPPROTO_ICMP,
nw_src=controller_ip,
nw_dst=controller_ip_host),
priority=self.dp.highest_priority))
else:
ofmsgs.append(self.valve_flowcontroller(
self.dp.eth_src_table,
self.valve_in_match(
eth_type=ether.ETH_TYPE_IPV6,
vlan=vlan,
nw_proto=inet.IPPROTO_ICMPV6,
ipv6_nd_target=controller_ip_host,
icmpv6_type=icmpv6.ND_NEIGHBOR_SOLICIT),
priority=self.dp.highest_priority))
ofmsgs.append(self.valve_flowcontroller(
self.dp.eth_src_table,
self.valve_in_match(
eth_type=ether.ETH_TYPE_IPV6,
eth_dst=self.FAUCET_MAC,
vlan=vlan,
nw_proto=inet.IPPROTO_ICMPV6,
icmpv6_type=icmpv6.ND_NEIGHBOR_ADVERT),
priority=self.dp.highest_priority))
ofmsgs.append(self.valve_flowcontroller(
self.dp.eth_src_table,
self.valve_in_match(
eth_type=ether.ETH_TYPE_IPV6,
vlan=vlan,
nw_proto=inet.IPPROTO_ICMPV6,
nw_dst=controller_ip_host,
icmpv6_type=icmpv6.ICMPV6_ECHO_REQUEST),
priority=self.dp.highest_priority))
return ofmsgs