def resource_setup(cls):
super(PortsRbacTest, cls).resource_setup()
# Create a network and subnet.
cls.network = cls.create_network()
cls.cidr = netaddr.IPNetwork(CONF.network.project_network_cidr)
cls.subnet = cls.create_subnet(cls.network, cidr=cls.cidr,
mask_bits=24)
cls.ip_range = netaddr.IPRange(
cls.subnet['allocation_pools'][0]['start'],
cls.subnet['allocation_pools'][0]['end'])
cls.port = cls.create_port(cls.network)
ipaddr = cls.port['fixed_ips'][0]['ip_address']
cls.port_ip_address = ipaddr
cls.port_mac_address = cls.port['mac_address']
python类IPRange()的实例源码
def resource_setup(cls):
super(RouterRbacTest, cls).resource_setup()
# Create a network with external gateway so that
# ``external_gateway_info`` policies can be validated.
post_body = {'router:external': True}
cls.network = cls.create_network(**post_body)
cls.subnet = cls.create_subnet(cls.network)
cls.ip_range = netaddr.IPRange(
cls.subnet['allocation_pools'][0]['start'],
cls.subnet['allocation_pools'][0]['end'])
cls.router = cls.create_router()
def get_range(self, targets):
if targets is None:
return None
try:
target_list = []
for target in targets.split(','):
if '/' in target:
target_list.extend(list(IPNetwork(target)))
elif '-' in target:
start_addr = IPAddress(target.split('-')[0])
try:
end_addr = IPAddress(target.split('-')[1])
ip_range = IPRange(start_addr, end_addr)
except AddrFormatError:
end_addr = list(start_addr.words)
end_addr[-1] = target.split('-')[1]
end_addr = IPAddress('.'.join(map(str, end_addr)))
ip_range = IPRange(start_addr, end_addr)
target_list.extend(list(ip_range))
else:
target_list.append(IPAddress(target))
return target_list
except AddrFormatError:
sys.exit("Specified an invalid IP address/range/network as target")
def _pool_is_growing(original_pool, new_pool):
# create IPSet for original pool
ori_set = netaddr.IPSet()
for rng in original_pool._alloc_pools:
ori_set.add(netaddr.IPRange(rng['start'], rng['end']))
# create IPSet for net pool
new_set = netaddr.IPSet()
for rng in new_pool._alloc_pools:
new_set.add(netaddr.IPRange(rng['start'], rng['end']))
# we are growing the original set is not a superset of the new set
return not ori_set.issuperset(new_set)
def test_update_allocation_pools(self):
cidr = "192.168.1.0/24"
ip_network = netaddr.IPNetwork(cidr)
network = dict(name="public", tenant_id="fake", network_plugin="BASE")
network = {"network": network}
subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
cidr=cidr, first_ip=ip_network.first,
last_ip=ip_network.last, ip_policy=None,
tenant_id="fake")
subnet = {"subnet": subnet}
with self._stubs(network, subnet) as (net, sub1):
subnet = subnet_api.get_subnet(self.context, 1)
start_pools = subnet['allocation_pools']
new_pools = [
[dict(start='192.168.1.10', end='192.168.1.50')],
[dict(start='192.168.1.5', end='192.168.1.25')],
[dict(start='192.168.1.50', end='192.168.1.51')],
[dict(start='192.168.1.50', end='192.168.1.51'),
dict(start='192.168.1.100', end='192.168.1.250')],
[dict(start='192.168.1.50', end='192.168.1.51')],
start_pools,
]
prev_pool = start_pools
for pool in new_pools:
subnet_update = {"subnet": dict(allocation_pools=pool)}
subnet = subnet_api.update_subnet(self.context, 1,
subnet_update)
self.assertNotEqual(prev_pool, subnet['allocation_pools'])
self.assertEqual(pool, subnet['allocation_pools'])
policies = policy_api.get_ip_policies(self.context)
self.assertEqual(1, len(policies))
policy = policies[0]
ip_set = netaddr.IPSet()
for ip in policy['exclude']:
ip_set.add(netaddr.IPNetwork(ip))
for extent in pool:
for ip in netaddr.IPRange(extent['start'], extent['end']):
self.assertFalse(ip in ip_set)
prev_pool = pool
def test_allow_allocation_pool_growth(self):
CONF.set_override('allow_allocation_pool_growth', True, 'QUARK')
cidr = "192.168.1.0/24"
ip_network = netaddr.IPNetwork(cidr)
network = dict(name="public", tenant_id="fake", network_plugin="BASE")
network = {"network": network}
pool = [dict(start='192.168.1.15', end='192.168.1.30')]
subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
cidr=cidr, first_ip=ip_network.first,
last_ip=ip_network.last, ip_policy=None,
allocation_pools=pool, tenant_id="fake")
subnet = {"subnet": subnet}
with self._stubs(network, subnet) as (net, sub1):
subnet = subnet_api.get_subnet(self.context, 1)
start_pools = subnet['allocation_pools']
new_pool = [dict(start='192.168.1.10', end='192.168.1.50')]
subnet_update = {"subnet": dict(allocation_pools=new_pool)}
subnet = subnet_api.update_subnet(self.context, 1,
subnet_update)
self.assertNotEqual(start_pools, subnet['allocation_pools'])
self.assertEqual(new_pool, subnet['allocation_pools'])
policies = policy_api.get_ip_policies(self.context)
self.assertEqual(1, len(policies))
policy = policies[0]
ip_set = netaddr.IPSet()
for ip in policy['exclude']:
ip_set.add(netaddr.IPNetwork(ip))
for extent in new_pool:
for ip in netaddr.IPRange(extent['start'], extent['end']):
self.assertFalse(ip in ip_set)
start_ip_set = netaddr.IPSet()
for rng in start_pools:
start_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))
new_ip_set = netaddr.IPSet()
for rng in subnet['allocation_pools']:
new_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))
self.assertTrue(start_ip_set | new_ip_set != start_ip_set)
def test_do_not_allow_allocation_pool_growth(self):
CONF.set_override('allow_allocation_pool_growth', False, 'QUARK')
cidr = "192.168.1.0/24"
ip_network = netaddr.IPNetwork(cidr)
network = dict(name="public", tenant_id="fake", network_plugin="BASE")
network = {"network": network}
pool = [dict(start='192.168.1.15', end='192.168.1.30')]
subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
cidr=cidr, first_ip=ip_network.first,
last_ip=ip_network.last, ip_policy=None,
allocation_pools=pool, tenant_id="fake")
subnet = {"subnet": subnet}
with self._stubs(network, subnet) as (net, sub1):
subnet = subnet_api.get_subnet(self.context, 1)
start_pools = subnet['allocation_pools']
new_pool = [dict(start='192.168.1.10', end='192.168.1.50')]
start_ip_set = netaddr.IPSet()
for rng in start_pools:
start_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))
new_ip_set = netaddr.IPSet()
for rng in new_pool:
new_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))
self.assertTrue(start_ip_set | new_ip_set != start_ip_set)
subnet_update = {"subnet": dict(allocation_pools=new_pool)}
with self.assertRaises(n_exc.BadRequest):
subnet = subnet_api.update_subnet(self.context, 1,
subnet_update)
def test_ipset_basic_api():
range1 = IPRange('192.0.2.1', '192.0.2.15')
ip_list = [
IPAddress('192.0.2.1'),
'192.0.2.2/31',
IPNetwork('192.0.2.4/31'),
IPAddress('192.0.2.6'),
IPAddress('192.0.2.7'),
'192.0.2.8',
'192.0.2.9',
IPAddress('192.0.2.10'),
IPAddress('192.0.2.11'),
IPNetwork('192.0.2.12/30'),
]
set1 = IPSet(range1.cidrs())
set2 = IPSet(ip_list)
assert set2 == IPSet([
'192.0.2.1/32',
'192.0.2.2/31',
'192.0.2.4/30',
'192.0.2.8/29',
])
assert set1 == set2
assert set2.pop() in set1
assert set1 != set2
def test_ipset_constructor():
assert IPSet(['192.0.2.0']) == IPSet(['192.0.2.0/32'])
assert IPSet([IPAddress('192.0.2.0')]) == IPSet(['192.0.2.0/32'])
assert IPSet([IPNetwork('192.0.2.0')]) == IPSet(['192.0.2.0/32'])
assert IPSet(IPNetwork('1234::/32')) == IPSet(['1234::/32'])
assert IPSet([IPNetwork('192.0.2.0/24')]) == IPSet(['192.0.2.0/24'])
assert IPSet(IPSet(['192.0.2.0/32'])) == IPSet(['192.0.2.0/32'])
assert IPSet(IPRange("10.0.0.0", "10.0.1.31")) == IPSet(['10.0.0.0/24', '10.0.1.0/27'])
assert IPSet(IPRange('0.0.0.0', '255.255.255.255')) == IPSet(['0.0.0.0/0'])
def test_ipset_member_insertion_and_deletion():
s1 = IPSet()
s1.add('192.0.2.0')
assert s1 == IPSet(['192.0.2.0/32'])
s1.remove('192.0.2.0')
assert s1 == IPSet([])
s1.add(IPRange("10.0.0.0", "10.0.0.255"))
assert s1 == IPSet(['10.0.0.0/24'])
s1.remove(IPRange("10.0.0.128", "10.10.10.10"))
assert s1 == IPSet(['10.0.0.0/25'])
def test_ipset_membership():
iprange = IPRange('192.0.1.255', '192.0.2.16')
assert iprange.cidrs() == [
IPNetwork('192.0.1.255/32'),
IPNetwork('192.0.2.0/28'),
IPNetwork('192.0.2.16/32'),
]
ipset = IPSet(['192.0.2.0/28'])
assert [(str(ip), ip in ipset) for ip in iprange] == [
('192.0.1.255', False),
('192.0.2.0', True),
('192.0.2.1', True),
('192.0.2.2', True),
('192.0.2.3', True),
('192.0.2.4', True),
('192.0.2.5', True),
('192.0.2.6', True),
('192.0.2.7', True),
('192.0.2.8', True),
('192.0.2.9', True),
('192.0.2.10', True),
('192.0.2.11', True),
('192.0.2.12', True),
('192.0.2.13', True),
('192.0.2.14', True),
('192.0.2.15', True),
('192.0.2.16', False),
]
def test_ipset_clear():
ipset = IPSet(['10.0.0.0/16'])
ipset.update(IPRange('10.1.0.0', '10.1.255.255'))
assert ipset == IPSet(['10.0.0.0/15'])
ipset.clear()
assert ipset == IPSet([])
def test_converting_ipsets_to_ipranges():
assert list(IPSet().iter_ipranges()) == []
assert list(IPSet([IPAddress('10.0.0.1')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.1')]
assert list(IPSet([IPAddress('10.0.0.1'), IPAddress('10.0.0.2')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.2')]
def test_len_on_ipset_failure_with_large_ipv6_addresses():
s1 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint, 6)))
with pytest.raises(IndexError):
len(s1)
s2 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint - 1, 6)))
assert len(s2) == _sys_maxint
def test_ipset_ipv4_and_ipv4_separation():
assert list(IPSet([IPAddress(1, 4), IPAddress(1, 6)]).iter_ipranges()) == [IPRange('0.0.0.1', '0.0.0.1'), IPRange('::1', '::1')]
def test_iprange_boolean_evaluation():
assert bool(IPRange('0.0.0.0', '255.255.255.255'))
assert bool(IPRange('0.0.0.0', '0.0.0.0'))
def test_iprange_indexing():
iprange = IPRange('192.0.2.1', '192.0.2.254')
assert len(iprange) == 254
assert iprange.first == 3221225985
assert iprange.last == 3221226238
assert iprange[0] == IPAddress('192.0.2.1')
assert iprange[-1] == IPAddress('192.0.2.254')
with pytest.raises(IndexError):
iprange[512]
def test_iprange_ipv6_unsupported_slicing():
with pytest.raises(TypeError):
IPRange('::ffff:192.0.2.1', '::ffff:192.0.2.254')[0:10:2]
def test_iprange_membership():
assert IPRange('192.0.2.5', '192.0.2.10') in IPRange('192.0.2.1', '192.0.2.254')
assert IPRange('fe80::1', 'fe80::fffe') in IPRange('fe80::', 'fe80::ffff:ffff:ffff:ffff')
assert IPRange('192.0.2.5', '192.0.2.10') not in IPRange('::', '::255.255.255.255')
def test_more_iprange_sorting():
ipranges = (IPRange('192.0.2.40', '192.0.2.50'), IPRange('192.0.2.20', '192.0.2.30'), IPRange('192.0.2.1', '192.0.2.254'),)
assert sorted(ipranges) == [IPRange('192.0.2.1', '192.0.2.254'), IPRange('192.0.2.20', '192.0.2.30'), IPRange('192.0.2.40', '192.0.2.50')]
ipranges = list(ipranges)
ipranges.append(IPRange('192.0.2.45', '192.0.2.49'))
assert sorted(ipranges) == [
IPRange('192.0.2.1', '192.0.2.254'),
IPRange('192.0.2.20', '192.0.2.30'),
IPRange('192.0.2.40', '192.0.2.50'),
IPRange('192.0.2.45', '192.0.2.49'),
]