def test_ipset_operations_with_combined_ipv4_and_ipv6():
s1 = IPSet(['192.0.2.0', '::192.0.2.0', '192.0.2.2', '::192.0.2.2'])
s2 = IPSet(['192.0.2.2', '::192.0.2.2', '192.0.2.4', '::192.0.2.4'])
s3 = IPSet(['0.0.0.1', '10.0.0.64/30', '255.255.255.1'])
s4 = IPSet(['10.0.0.64', '10.0.0.66'])
s4b = IPSet(['10.0.0.64', '10.0.0.66', '111.111.111.111'])
s5 = IPSet(['10.0.0.65', '10.0.0.67'])
s6 = IPSet(['2405:8100::/32'])
assert bool(s6)
assert not bool(IPSet())
# set intersection
assert s2 & s1 == IPSet(['192.0.2.2/32', '::192.0.2.2/128'])
assert s3 & s4 == IPSet(['10.0.0.64/32', '10.0.0.66/32'])
assert s4 & s3 == IPSet(['10.0.0.64/32', '10.0.0.66/32'])
assert s3 & s5 == IPSet(['10.0.0.65/32', '10.0.0.67/32'])
assert s5 & s3 == IPSet(['10.0.0.65/32', '10.0.0.67/32'])
# set difference
assert s3 - s4 == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32'])
assert s4 - s3 == IPSet([])
assert s3 - s4b == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32'])
assert s3 - s5 == IPSet(['0.0.0.1/32', '10.0.0.64/32', '10.0.0.66/32', '255.255.255.1/32'])
assert s5 - s3 == IPSet([])
# set symmetric difference
assert s2 ^ s1 == IPSet(['192.0.2.0/32', '192.0.2.4/32', '::192.0.2.0/128', '::192.0.2.4/128'])
assert IPSet([]) ^ IPSet([]) == IPSet([])
assert IPSet(['0.0.0.1/32']) ^ IPSet([]) == IPSet(['0.0.0.1/32'])
assert IPSet(['0.0.0.1/32']) ^ IPSet(['0.0.0.1/32']) == IPSet([])
assert s3 ^ s4 == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32'])
assert s4 ^ s3 == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32'])
assert s3 ^ s4b == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '111.111.111.111/32', '255.255.255.1/32'])
assert s3 ^ s5 == IPSet(['0.0.0.1/32', '10.0.0.64/32', '10.0.0.66/32', '255.255.255.1/32'])
assert s5 ^ s3 == IPSet(['0.0.0.1/32', '10.0.0.64/32', '10.0.0.66/32', '255.255.255.1/32'])
python类IPSet()的实例源码
def test_converting_ipsets_to_ipranges():
assert list(IPSet().iter_ipranges()) == []
assert list(IPSet([IPAddress('10.0.0.1')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.1')]
assert list(IPSet([IPAddress('10.0.0.1'), IPAddress('10.0.0.2')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.2')]
def test_len_on_ipset_failure_with_large_ipv6_addresses():
s1 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint, 6)))
with pytest.raises(IndexError):
len(s1)
s2 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint - 1, 6)))
assert len(s2) == _sys_maxint
def test_ipset_ipv4_and_ipv4_separation():
assert list(IPSet([IPAddress(1, 4), IPAddress(1, 6)]).iter_ipranges()) == [IPRange('0.0.0.1', '0.0.0.1'), IPRange('::1', '::1')]
def test_ipset_exceptions():
s1 = IPSet(['10.0.0.1'])
# IPSet objects are not hashable.
with pytest.raises(TypeError):
hash(s1)
# Bad update argument type.
with pytest.raises(TypeError):
s1.update(42)
def test_ipset_converts_to_cidr_networks_v4():
s1 = IPSet(IPNetwork('10.1.2.3/8'))
s1.add(IPNetwork('192.168.1.2/16'))
assert list(s1.iter_cidrs()) == [
IPNetwork('10.0.0.0/8'),
IPNetwork('192.168.0.0/16'),
]
def test_ipset_converts_to_cidr_networks_v6():
s1 = IPSet(IPNetwork('fe80::4242/64'))
s1.add(IPNetwork('fe90::4343/64'))
assert list(s1.iter_cidrs()) == [
IPNetwork('fe80::/64'),
IPNetwork('fe90::/64'),
]
def ips_to_spf_strings(ips):
other_tokens = list()
for index, ip in enumerate(ips):
try:
IPNetwork(ip)
except AddrFormatError:
other_tokens.append(ip)
for token in other_tokens:
ips.remove(token)
ips = [str(i) for i in IPSet(ips).iter_cidrs()]
ips = ['ip6:' + ip if ':' in ip else
'ip4:' + ip.replace('/32', '')
for ip in ips]
return ips + other_tokens
configuration_data_randomizer.py 文件源码
项目:restconf-examples
作者: CiscoDevNet
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def random_address(base):
"""Return a random address based on a base prefix."""
prefix = netaddr.IPNetwork(base)
addresses = netaddr.IPSet(prefix)
for address in [prefix.network, prefix.broadcast]:
addresses.remove(address)
return str(random.choice(list(addresses))) + '/' + str(prefix.prefixlen)
def ipset(nets):
v4nets = netaddr.IPSet()
v6nets = netaddr.IPSet()
for net in nets:
ipNetwork = netaddr.IPNetwork(net)
parts = str(ipNetwork).split("/")
ip = parts[0]
mask = parts[1]
if netaddr.valid_ipv4(ip) and int(mask) <= 32:
v4nets.add(ipNetwork)
elif netaddr.valid_ipv6(ip) and int(mask) <= 128:
v6nets.add(ipNetwork)
return v4nets, v6nets
def populate_reserved(self):
with open(self.reserved_ip, 'r') as f:
lines = f.readlines()
ip_list = []
for line in lines:
if not line.startswith('#'):
line = line.strip()
if len(line) > 0:
ip_list.append(line)
self.ipset_reserved = IPSet(ip_list)
def derive_outwall(self):
"""
This would not only inverse the set with the "big one", it would also exclude
See: http://www.tcpipguide.com/free/t_IPReservedPrivateandLoopbackAddresses-3.htm
"""
self.ipset_outwall = IPSet(['0.0.0.0/0']) - self.ipset_inwall - self.ipset_reserved
self.cidrs_outwall = list(self.ipset_outwall.iter_cidrs())
logging.info("Finished deriving out-wall IP table(s). Total: %i CIDR blocks.", len(self.cidrs_outwall), )
def _validate_subnet_cidr(context, network_id, new_subnet_cidr):
"""Validate the CIDR for a subnet.
Verifies the specified CIDR does not overlap with the ones defined
for the other subnets specified for this network, or with any other
CIDR if overlapping IPs are disabled.
"""
if neutron_cfg.cfg.CONF.allow_overlapping_ips:
return
try:
new_subnet_ipset = netaddr.IPSet([new_subnet_cidr])
except TypeError:
LOG.exception("Invalid or missing cidr: %s" % new_subnet_cidr)
raise n_exc.BadRequest(resource="subnet",
msg="Invalid or missing cidr")
filters = {
'network_id': network_id,
'shared': [False]
}
# Using admin context here, in case we actually share networks later
subnet_list = db_api.subnet_find(context=context.elevated(), **filters)
for subnet in subnet_list:
if (netaddr.IPSet([subnet.cidr]) & new_subnet_ipset):
# don't give out details of the overlapping subnet
err_msg = (_("Requested subnet with cidr: %(cidr)s for "
"network: %(network_id)s overlaps with another "
"subnet") %
{'cidr': new_subnet_cidr,
'network_id': network_id})
LOG.error(_("Validation for CIDR: %(new_cidr)s failed - "
"overlaps with subnet %(subnet_id)s "
"(CIDR: %(cidr)s)"),
{'new_cidr': new_subnet_cidr,
'subnet_id': subnet.id,
'cidr': subnet.cidr})
raise n_exc.InvalidInput(error_message=err_msg)
def _pool_is_growing(original_pool, new_pool):
# create IPSet for original pool
ori_set = netaddr.IPSet()
for rng in original_pool._alloc_pools:
ori_set.add(netaddr.IPRange(rng['start'], rng['end']))
# create IPSet for net pool
new_set = netaddr.IPSet()
for rng in new_pool._alloc_pools:
new_set.add(netaddr.IPRange(rng['start'], rng['end']))
# we are growing the original set is not a superset of the new set
return not ori_set.issuperset(new_set)
def ensure_default_policy(cidrs, subnets):
policy_cidrs = netaddr.IPSet(cidrs)
for subnet in subnets:
subnet_cidr = netaddr.IPNetwork(subnet["cidr"])
network_ip = subnet_cidr.network
broadcast_ip = subnet_cidr.broadcast
prefix_len = '32' if subnet_cidr.version == 4 else '128'
default_policy_cidrs = ["%s/%s" % (network_ip, prefix_len),
"%s/%s" % (broadcast_ip, prefix_len)]
for cidr in default_policy_cidrs:
if (netaddr.IPNetwork(cidr) not in policy_cidrs
and cidr not in cidrs):
cidrs.append(cidr)
def allocation_pools(self):
_cache = self.get("_allocation_pool_cache")
if _cache:
pools = json.loads(_cache)
return pools
else:
if self["ip_policy"]:
ip_policy_cidrs = self["ip_policy"].get_cidrs_ip_set()
else:
ip_policy_cidrs = netaddr.IPSet([])
cidr = netaddr.IPSet([netaddr.IPNetwork(self["cidr"])])
allocatable = cidr - ip_policy_cidrs
pools = _pools_from_cidr(allocatable)
return pools
def get_cidrs_ip_set(self):
ip_policies = self.get("exclude", [])
ip_policy_cidrs = [ip_policy.cidr for ip_policy in ip_policies]
return netaddr.IPSet(ip_policy_cidrs)
def upgrade():
ip_policy = table('quark_ip_policy',
column('id', sa.String(length=36)),
column('size', INET()))
ip_policy_cidrs = table('quark_ip_policy_cidrs',
column('ip_policy_id', sa.String(length=36)),
column('cidr', sa.String(length=64)))
connection = op.get_bind()
# 1. Retrieve all ip_policy_cidr rows.
results = connection.execute(
select([ip_policy_cidrs.c.ip_policy_id, ip_policy_cidrs.c.cidr])
).fetchall()
# 2. Determine IPSet for each IP Policy.
ipp = dict()
for ip_policy_id, cidr in results:
if ip_policy_id not in ipp:
ipp[ip_policy_id] = netaddr.IPSet()
ipp[ip_policy_id].add(cidr)
# 3. Populate size for each IP Policy.
for ip_policy_id in ipp:
connection.execute(ip_policy.update().values(
size=ipp[ip_policy_id].size).where(
ip_policy.c.id == ip_policy_id))
def test_allow_allocation_pool_growth(self):
CONF.set_override('allow_allocation_pool_growth', True, 'QUARK')
cidr = "192.168.1.0/24"
ip_network = netaddr.IPNetwork(cidr)
network = dict(name="public", tenant_id="fake", network_plugin="BASE")
network = {"network": network}
pool = [dict(start='192.168.1.15', end='192.168.1.30')]
subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
cidr=cidr, first_ip=ip_network.first,
last_ip=ip_network.last, ip_policy=None,
allocation_pools=pool, tenant_id="fake")
subnet = {"subnet": subnet}
with self._stubs(network, subnet) as (net, sub1):
subnet = subnet_api.get_subnet(self.context, 1)
start_pools = subnet['allocation_pools']
new_pool = [dict(start='192.168.1.10', end='192.168.1.50')]
subnet_update = {"subnet": dict(allocation_pools=new_pool)}
subnet = subnet_api.update_subnet(self.context, 1,
subnet_update)
self.assertNotEqual(start_pools, subnet['allocation_pools'])
self.assertEqual(new_pool, subnet['allocation_pools'])
policies = policy_api.get_ip_policies(self.context)
self.assertEqual(1, len(policies))
policy = policies[0]
ip_set = netaddr.IPSet()
for ip in policy['exclude']:
ip_set.add(netaddr.IPNetwork(ip))
for extent in new_pool:
for ip in netaddr.IPRange(extent['start'], extent['end']):
self.assertFalse(ip in ip_set)
start_ip_set = netaddr.IPSet()
for rng in start_pools:
start_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))
new_ip_set = netaddr.IPSet()
for rng in subnet['allocation_pools']:
new_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))
self.assertTrue(start_ip_set | new_ip_set != start_ip_set)
def test_do_not_allow_allocation_pool_growth(self):
CONF.set_override('allow_allocation_pool_growth', False, 'QUARK')
cidr = "192.168.1.0/24"
ip_network = netaddr.IPNetwork(cidr)
network = dict(name="public", tenant_id="fake", network_plugin="BASE")
network = {"network": network}
pool = [dict(start='192.168.1.15', end='192.168.1.30')]
subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
cidr=cidr, first_ip=ip_network.first,
last_ip=ip_network.last, ip_policy=None,
allocation_pools=pool, tenant_id="fake")
subnet = {"subnet": subnet}
with self._stubs(network, subnet) as (net, sub1):
subnet = subnet_api.get_subnet(self.context, 1)
start_pools = subnet['allocation_pools']
new_pool = [dict(start='192.168.1.10', end='192.168.1.50')]
start_ip_set = netaddr.IPSet()
for rng in start_pools:
start_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))
new_ip_set = netaddr.IPSet()
for rng in new_pool:
new_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))
self.assertTrue(start_ip_set | new_ip_set != start_ip_set)
subnet_update = {"subnet": dict(allocation_pools=new_pool)}
with self.assertRaises(n_exc.BadRequest):
subnet = subnet_api.update_subnet(self.context, 1,
subnet_update)