def __init__(self,
data # Data suitable for this class
):
valid, message = data_is_valid(data)
if not valid:
raise ValueError("Invalid data: %s" % message)
self.cidrs = []
for iface in netifaces.interfaces():
ifaddrs = netifaces.ifaddresses(iface)
if netifaces.AF_INET in ifaddrs:
for ifaddr in ifaddrs[netifaces.AF_INET]:
if 'addr' in ifaddr:
self.cidrs.append(ipaddr.IPNetwork(ifaddr['addr']))
if netifaces.AF_INET6 in ifaddrs:
for ifaddr in ifaddrs[netifaces.AF_INET6]:
if 'addr' in ifaddr:
#add v6 but remove stuff like %eth0 that gets thrown on end of some addrs
self.cidrs.append(ipaddr.IPNetwork(ifaddr['addr'].split('%')[0]))
python类IPNetwork()的实例源码
def evaluate(self,
hints # Information used for doing identification
):
"""Given a set of hints, evaluate this identifier and return True if
an identification is made.
"""
try:
ip = ipaddr.IPNetwork(hints['requester'])
except KeyError:
return False
# TODO: Find out of there's a more hash-like way to do this
# instead of a linear search. This would be great if it
# weren't GPL: https://pypi.python.org/pypi/pytricia
for cidr in self.cidrs:
if ip in cidr:
return True
return False
# A short test program
def __init__(self,
data # Data suitable for this class
):
valid, message = data_is_valid(data)
if not valid:
raise ValueError("Invalid data: %s" % message)
self.exclude = [ ipaddr.IPNetwork(addr)
for addr in data['exclude'] ]
try:
timeout = pscheduler.iso8601_as_timedelta(data['timeout'])
self.timeout = pscheduler.timedelta_as_seconds(timeout)
except KeyError:
self.timeout = 2
try:
self.fail_result = data['fail-result']
except KeyError:
self.fail_result = False
self.resolver = dns.resolver.Resolver()
self.resolver.timeout = self.timeout
self.resolver.lifetime = self.timeout
def Validate(self, value, unused_key=None):
"""Validates a subnet."""
if value is None:
raise validation.MissingAttribute('subnet must be specified')
if not isinstance(value, basestring):
raise validation.ValidationError('subnet must be a string, not \'%r\'' %
type(value))
try:
ipaddr.IPNetwork(value)
except ValueError:
raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' %
value)
parts = value.split('/')
if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
raise validation.ValidationError('Prefix length of subnet %s must be an '
'integer (quad-dotted masks are not '
'supported)' % value)
return value
def Validate(self, value, unused_key=None):
"""Validates a subnet."""
if value is None:
raise validation.MissingAttribute('subnet must be specified')
if not isinstance(value, basestring):
raise validation.ValidationError('subnet must be a string, not \'%r\'' %
type(value))
try:
ipaddr.IPNetwork(value)
except ValueError:
raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' %
value)
parts = value.split('/')
if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
raise validation.ValidationError('Prefix length of subnet %s must be an '
'integer (quad-dotted masks are not '
'supported)' % value)
return value
def test_duplicate(net_str, net2_str=None, dup_found=True):
usres_monitor.add_net(ipaddr.IPNetwork(net_str))
try:
usres_monitor.add_net(
ipaddr.IPNetwork(net2_str if net2_str else net_str)
)
except USRESMonitorException as e:
if "it was already in the db" in str(e):
test_outcome("test_duplicate",
"{} and {}".format(
net_str,
net2_str if net2_str else net_str),
"OK" if dup_found else "FAIL")
if not dup_found:
raise AssertionError("Duplicate found")
return
test_outcome("test_duplicate",
"{} and {}".format(
net_str,
net2_str if net2_str else net_str),
"FAIL" if dup_found else "OK")
if dup_found:
raise AssertionError("Duplicate not found")
def Validate(self, value, unused_key=None):
"""Validates a subnet."""
if value is None:
raise validation.MissingAttribute('subnet must be specified')
if not isinstance(value, basestring):
raise validation.ValidationError('subnet must be a string, not \'%r\'' %
type(value))
try:
ipaddr.IPNetwork(value)
except ValueError:
raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' %
value)
parts = value.split('/')
if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
raise validation.ValidationError('Prefix length of subnet %s must be an '
'integer (quad-dotted masks are not '
'supported)' % value)
return value
def read_testbed_topo(self):
with open(self.testbed_filename) as f:
topo = csv.DictReader(f)
for line in topo:
tb_prop = {}
name = ''
for key in line:
if ('uniq-name' in key or 'conf-name' in key) and '#' in line[key]:
### skip comment line
continue
elif 'uniq-name' in key or 'conf-name' in key:
name = line[key]
elif 'ptf_ip' in key and line[key]:
ptfaddress = ipaddress.IPNetwork(line[key])
tb_prop['ptf_ip'] = str(ptfaddress.ip)
tb_prop['ptf_netmask'] = str(ptfaddress.netmask)
else:
tb_prop[key] = line[key]
if name:
self.testbed_topo[name] = tb_prop
return
def Validate(self, value, unused_key=None):
"""Validates a subnet."""
if value is None:
raise validation.MissingAttribute('subnet must be specified')
if not isinstance(value, basestring):
raise validation.ValidationError('subnet must be a string, not \'%r\'' %
type(value))
try:
ipaddr.IPNetwork(value)
except ValueError:
raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' %
value)
parts = value.split('/')
if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
raise validation.ValidationError('Prefix length of subnet %s must be an '
'integer (quad-dotted masks are not '
'supported)' % value)
return value
def Validate(self, value, unused_key=None):
"""Validates a subnet."""
if value is None:
raise validation.MissingAttribute('subnet must be specified')
if not isinstance(value, basestring):
raise validation.ValidationError('subnet must be a string, not \'%r\'' %
type(value))
try:
ipaddr.IPNetwork(value)
except ValueError:
raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' %
value)
parts = value.split('/')
if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
raise validation.ValidationError('Prefix length of subnet %s must be an '
'integer (quad-dotted masks are not '
'supported)' % value)
return value
def is_valid_ip(ip_address, project):
"""
Verify that an IP address is not being blacklisted
for the given project.
"""
blacklist = project.get_option('sentry:blacklisted_ips')
if not blacklist:
return True
ip_network = IPNetwork(ip_address)
for addr in blacklist:
# We want to error fast if it's an exact match
if ip_address == addr:
return False
# Check to make sure it's actually a range before
# attempting to see if we're within that range
if '/' in addr and ip_network in IPNetwork(addr):
return False
return True
# #845, add for server name filter, by hzwangzhiwei @20160803
def parse_device_desc_xml(filename):
root = ET.parse(filename).getroot()
(lo_prefix, mgmt_prefix, hostname, hwsku, d_type) = parse_device(root)
results = {}
results['DEVICE_METADATA'] = {'localhost': {
'hostname': hostname,
'hwsku': hwsku,
}}
results['LOOPBACK_INTERFACE'] = {('lo', lo_prefix): {}}
mgmt_intf = {}
mgmtipn = ipaddress.IPNetwork(mgmt_prefix)
gwaddr = ipaddress.IPAddress(int(mgmtipn.network) + 1)
results['MGMT_INTERFACE'] = {('eth0', mgmt_prefix): {'gwaddr': gwaddr}}
return results
def __init__(self,
data # Data suitable for this class
):
valid, message = data_is_valid(data)
if not valid:
raise ValueError("Invalid data: %s" % message)
self.cidrs = []
for cidr in data['cidrs']:
self.cidrs.append(ipaddr.IPNetwork(cidr))
def evaluate(self,
hints # Information used for doing identification
):
"""Given a set of hints, evaluate this identifier and return True if
an identification is made.
"""
try:
ip = ipaddr.IPNetwork(hints['requester'])
except KeyError:
return False
# TODO: Find out of there's a more hash-like way to do this
# instead of a linear search. This would be great if it
# weren't GPL: https://pypi.python.org/pypi/pytricia
for cidr in self.cidrs:
if ip in cidr:
return True
return False
# A short test program
def net_is_valid(self, net):
"""
Returns true if the given string is a valid IP network (v4 or v6).
"""
try:
ipaddr.IPNetwork(net)
return True
except ValueError:
return False
#}}}
#{{{ IP type
def get_network(self, net_type):
"""
Returns a network (as a string) that includes the private/public IPs of all nodes in the config.
Raises an EXAConfError if an invalid IP is found or the IP of at least one node is not part
of the network defined by the first node section.
This function assumes that all nodes have an entry for the requested network type. The calling
function has to check if the network type is actually present (private / public).
"""
network = ""
for section in self.config.sections:
if self.is_node(section):
node_sec = self.config[section]
node_network = node_sec.get(net_type)
if not node_network or node_network == "":
raise EXAConfError("Network type '%s' is missing in section '%s'!" % (net_type, section))
node_ip = node_network.split("/")[0].strip()
# check if the extracted IP is valid
if not self.ip_is_valid(node_ip):
raise EXAConfError("IP %s in section '%s' is invalid!" % (node_ip, section))
# first node : choose the private net as the cluster network (and make it a 'real' network)
if network == "":
subnet = ipaddr.IPNetwork(node_network)
network = "%s/%s" % (str(subnet.network), str(subnet.prefixlen))
# other nodes : check if their IP is part of the chosen net
elif ipaddr.IPAddress(node_ip) not in ipaddr.IPNetwork(network):
raise EXAConfError("IP %s is not part of network %s!" % (node_ip, network))
return network
#}}}
#{{{ Get private network
def _ValidateEntry(self, entry):
if not entry.subnet:
return MISSING_SUBNET
try:
ipaddr.IPNetwork(entry.subnet)
except ValueError:
return BAD_IPV_SUBNET % entry.subnet
parts = entry.subnet.split('/')
if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
return BAD_PREFIX_LENGTH % entry.subnet
def _ValidateEntry(self, entry):
if not entry.subnet:
return MISSING_SUBNET
try:
ipaddr.IPNetwork(entry.subnet)
except ValueError:
return BAD_IPV_SUBNET % entry.subnet
parts = entry.subnet.split('/')
if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
return BAD_PREFIX_LENGTH % entry.subnet
def is_private_network(cidr):
return ipaddr.IPNetwork(cidr).is_private
def is_private_network(cidr):
return ipaddr.IPNetwork(cidr).is_private
def test_min_max(net_str, target_prefix_len, exp_first, exp_last,
shoud_fail=False):
failed = False
try:
net = ipaddr.IPNetwork(net_str)
# assert net_str.lower() == "{}/{}".format(exp_first, net.prefixlen), \
# ("String representations of original net and first expected "
# "prefix don't match.")
min_int, max_int, _ = usres_monitor.get_sre(net, target_prefix_len)
first = usres_monitor.get_ip_repr(net.version, min_int)
last = usres_monitor.get_ip_repr(net.version, max_int)
res = "{}/{}".format(first, target_prefix_len)
exp = "{}/{}".format(exp_first.lower(), target_prefix_len)
assert res == exp, \
("First expected prefix doesn't match: {} vs {}".format(
res, exp)
)
res = "{}/{}".format(last, target_prefix_len)
exp = "{}/{}".format(exp_last.lower(), target_prefix_len)
assert res == exp, \
("Last expected prefix doesn't match: {} vs {}".format(
res, exp)
)
except AssertionError:
failed = True
if shoud_fail:
pass
else:
raise
assert shoud_fail == failed
test_outcome("get_sre",
"{} in /{}".format(net_str, target_prefix_len),
"OK{}".format(" (failed as expected)" if shoud_fail else ""))
def get_net(net):
if isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network)):
return net
return ipaddr.IPNetwork(net)
def load_cluster_networks(self):
self.cluster_net = None
self.public_net = None
osd = self.get_alive_osd()
if osd is not None:
cluster_net_str = osd.config.get('cluster_network')
if cluster_net_str is not None and cluster_net_str != "":
self.cluster_net = IPNetwork(cluster_net_str)
public_net_str = osd.config.get('public_network', None)
if public_net_str is not None and public_net_str != "":
self.public_net = IPNetwork(public_net_str)
def _ValidateEntry(self, entry):
if not entry.subnet:
return MISSING_SUBNET
try:
ipaddr.IPNetwork(entry.subnet)
except ValueError:
return BAD_IPV_SUBNET % entry.subnet
parts = entry.subnet.split('/')
if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
return BAD_PREFIX_LENGTH % entry.subnet
def _ValidateEntry(self, entry):
if not entry.subnet:
return MISSING_SUBNET
try:
ipaddr.IPNetwork(entry.subnet)
except ValueError:
return BAD_IPV_SUBNET % entry.subnet
parts = entry.subnet.split('/')
if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
return BAD_PREFIX_LENGTH % entry.subnet
def test_0020_select_subnet_for_ip(self):
subnets = dormitory.Subnet.q.order_by(dormitory.Subnet.gateway).all()
for subnet in subnets:
for ip in ipaddr.IPNetwork(subnet.address).iterhosts():
selected = select_subnet_for_ip(ip.compressed, subnets)
self.assertEqual(subnet, selected)
def netmask(self):
net = ipaddr.IPNetwork(self.address)
return str(net.netmask)
def ip_version(self):
return ipaddr.IPNetwork(self.address).version
def _ip_subnet_valid(self, ip, subnet):
return ipaddr.IPAddress(ip) in ipaddr.IPNetwork(subnet.address)
def cni_ipam(host_cidrv4: str, host_gateway: str):
"""
With the class variables provide a way to generate a static host-local ipam
:param host_cidrv4:
:param host_gateway:
:return: dict
"""
host = ipaddr.IPNetwork(host_cidrv4)
subnet = host
ip_cut = int(host.ip.__str__().split(".")[-1])
if ConfigSyncSchedules.sub_ips:
sub = ipaddr.IPNetwork(host.network).ip + (ip_cut * ConfigSyncSchedules.sub_ips)
host = ipaddr.IPNetwork(sub)
range_start = host.ip + ConfigSyncSchedules.skip_ips
range_end = range_start + ConfigSyncSchedules.range_nb_ips
ipam = {
"type": "host-local",
"subnet": "%s/%s" % (subnet.network.__str__(), subnet.prefixlen),
"rangeStart": range_start.__str__(),
"rangeEnd": range_end.__str__(),
"gateway": host_gateway,
"routes": [
{"dst": "%s/32" % EC.perennial_local_host_ip, "gw": ipaddr.IPNetwork(host_cidrv4).ip.__str__()},
{"dst": "0.0.0.0/0"},
],
"dataDir": "/var/lib/cni/networks"
}
return ipam