def test_ipset_unions_intersections_differences():
adj_cidrs = list(IPNetwork('192.0.2.0/24').subnet(28))
even_cidrs = adj_cidrs[::2]
evens = IPSet(even_cidrs)
assert evens == IPSet([
'192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28',
'192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28',
'192.0.2.192/28', '192.0.2.224/28',
])
assert IPSet(['192.0.2.0/24']) & evens == IPSet([
'192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28',
'192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28',
'192.0.2.192/28', '192.0.2.224/28'])
odds = IPSet(['192.0.2.0/24']) ^ evens
assert odds == IPSet([
'192.0.2.16/28', '192.0.2.48/28', '192.0.2.80/28',
'192.0.2.112/28', '192.0.2.144/28', '192.0.2.176/28',
'192.0.2.208/28', '192.0.2.240/28'])
assert evens | odds == IPSet(['192.0.2.0/24'])
assert evens & odds == IPSet([])
assert evens ^ odds == IPSet(['192.0.2.0/24'])
python类IPSet()的实例源码
def test_combined_ipv4_and_ipv6_ipsets():
s1 = IPSet(['192.0.2.0', '::192.0.2.0', '192.0.2.2', '::192.0.2.2'])
s2 = IPSet(['192.0.2.2', '::192.0.2.2', '192.0.2.4', '::192.0.2.4'])
assert s1 | s2 == IPSet([
'192.0.2.0/32', '192.0.2.2/32', '192.0.2.4/32',
'::192.0.2.0/128', '::192.0.2.2/128', '::192.0.2.4/128',
])
assert s2 | s1 == IPSet([
'192.0.2.0/32', '192.0.2.2/32', '192.0.2.4/32',
'::192.0.2.0/128', '::192.0.2.2/128', '::192.0.2.4/128',
])
assert s1 & s2 == IPSet(['192.0.2.2/32', '::192.0.2.2/128'])
assert s1 - s2 == IPSet(['192.0.2.0/32', '::192.0.2.0/128'])
assert s2 - s1 == IPSet(['192.0.2.4/32', '::192.0.2.4/128'])
assert s1 ^ s2 == IPSet(['192.0.2.0/32', '192.0.2.4/32', '::192.0.2.0/128', '::192.0.2.4/128'])
def parse_table(self):
logging.info("Start parsing IP table(s)")
with open(self.cache_apnic, 'r') as f:
lines = f.readlines()
ip_list = []
for line in lines:
if line.startswith('apnic|CN|ipv4'):
line = line.rstrip()
apnic, country, v4v6, prefix, count_of_addr, date, status = line.split('|')
if v4v6 == 'ipv4' and country == 'CN':
decimal = 32 - binary_log(int(count_of_addr))
cidr_addr = prefix + '/' + str(decimal)
ip_list.append(cidr_addr)
self.ipset_inwall = IPSet(ip_list)
self.cidrs_inwall = list(self.ipset_inwall.iter_cidrs())
logging.info("Finished parsing in-wall IP table(s). Total: %i CIDR blocks.", len(self.cidrs_inwall), )
def ip_policy_update(context, ip_policy, **ip_policy_dict):
exclude = ip_policy_dict.pop("exclude", [])
if exclude:
ip_policy["exclude"] = []
ip_set = netaddr.IPSet()
for excluded_cidr in exclude:
cidr_net = netaddr.IPNetwork(excluded_cidr).ipv6()
ip_policy["exclude"].append(
models.IPPolicyCIDR(cidr=excluded_cidr,
first_ip=cidr_net.first,
last_ip=cidr_net.last))
ip_set.add(excluded_cidr)
ip_policy_dict["size"] = ip_set.size
ip_policy.update(ip_policy_dict)
context.session.add(ip_policy)
return ip_policy
def _build_excludes(self):
self._validate_allocation_pools()
subnet_net = netaddr.IPNetwork(self._subnet_cidr)
version = subnet_net.version
cidrset = netaddr.IPSet(
netaddr.IPRange(
netaddr.IPAddress(subnet_net.first, version=version),
netaddr.IPAddress(subnet_net.last, version=version)).cidrs())
if isinstance(self._alloc_pools, list):
for p in self._alloc_pools:
start = netaddr.IPAddress(p["start"])
end = netaddr.IPAddress(p["end"])
cidrset -= netaddr.IPSet(netaddr.IPRange(
netaddr.IPAddress(start),
netaddr.IPAddress(end)).cidrs())
elif self._alloc_pools is None:
# Empty list is completely unallocatable, None is fully
# allocatable
cidrset = netaddr.IPSet()
for p in self._policies:
cidrset.add(netaddr.IPNetwork(p))
self._exclude_cidrs = cidrset
def test_create_locks_lock_holder_exists(self):
network = db_api.network_create(self.context)
address_model = db_api.ip_address_create(
self.context,
address=netaddr.IPAddress("192.168.10.1"),
network=network)
db_api.lock_holder_create(
self.context, address_model,
name=null_routes.LOCK_NAME, type="ip_address")
self.context.session.flush()
addresses = netaddr.IPSet(netaddr.IPNetwork(self.sub_cidr))
null_routes.create_locks(self.context, [network.id], addresses)
lock_holders = db_api.lock_holder_find(
self.context,
lock_id=address_model.lock_id,
name=null_routes.LOCK_NAME,
scope=db_api.ALL)
self.assertEqual(len(lock_holders), 1)
def test_ipset_unions_intersections_differences():
adj_cidrs = list(IPNetwork('192.0.2.0/24').subnet(28))
even_cidrs = adj_cidrs[::2]
evens = IPSet(even_cidrs)
assert evens == IPSet([
'192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28',
'192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28',
'192.0.2.192/28', '192.0.2.224/28',
])
assert IPSet(['192.0.2.0/24']) & evens == IPSet([
'192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28',
'192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28',
'192.0.2.192/28', '192.0.2.224/28'])
odds = IPSet(['192.0.2.0/24']) ^ evens
assert odds == IPSet([
'192.0.2.16/28', '192.0.2.48/28', '192.0.2.80/28',
'192.0.2.112/28', '192.0.2.144/28', '192.0.2.176/28',
'192.0.2.208/28', '192.0.2.240/28'])
assert evens | odds == IPSet(['192.0.2.0/24'])
assert evens & odds == IPSet([])
assert evens ^ odds == IPSet(['192.0.2.0/24'])
def test_combined_ipv4_and_ipv6_ipsets():
s1 = IPSet(['192.0.2.0', '::192.0.2.0', '192.0.2.2', '::192.0.2.2'])
s2 = IPSet(['192.0.2.2', '::192.0.2.2', '192.0.2.4', '::192.0.2.4'])
assert s1 | s2 == IPSet([
'192.0.2.0/32', '192.0.2.2/32', '192.0.2.4/32',
'::192.0.2.0/128', '::192.0.2.2/128', '::192.0.2.4/128',
])
assert s2 | s1 == IPSet([
'192.0.2.0/32', '192.0.2.2/32', '192.0.2.4/32',
'::192.0.2.0/128', '::192.0.2.2/128', '::192.0.2.4/128',
])
assert s1 & s2 == IPSet(['192.0.2.2/32', '::192.0.2.2/128'])
assert s1 - s2 == IPSet(['192.0.2.0/32', '::192.0.2.0/128'])
assert s2 - s1 == IPSet(['192.0.2.4/32', '::192.0.2.4/128'])
assert s1 ^ s2 == IPSet(['192.0.2.0/32', '192.0.2.4/32', '::192.0.2.0/128', '::192.0.2.4/128'])
def get_ip_address(iscsi_network):
"""
Return an IP address assigned to the running host that matches the given
subnet address. This IP becomes the portal IP for the target portal group
:param iscsi_network: cidr network address
:return: IP address, or '' if the host does not have an interface on the
required subnet
"""
ip = ''
subnet = netaddr.IPSet([iscsi_network])
target_ip_range = [str(ip) for ip in subnet] # list where each element
# is an ip address
for local_ip in ipv4_address():
if local_ip in target_ip_range:
ip = local_ip
break
return ip
generate_user_config.py 文件源码
项目:os-services
作者: open-power-ref-design-toolkit
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def find_external_network(self, ext_floating_ip, networks):
"""Find external network based on the network address that matches
external-floating-ipaddr.
If not found, return the management network interface.
Returns: network name, network details
"""
if ext_floating_ip == 'N/A':
return None
for net, net_details in networks.iteritems():
if 'addr' in net_details:
# Check if matching
if (netaddr.IPAddress(ext_floating_ip) in
netaddr.IPSet([net_details['addr']])):
return net, net_details
return 'openstack-mgmt', networks.get('openstack-mgmt', None)
def calculate_free_ips(global_variable):
import itertools
from netaddr import IPSet, IPAddress
global_variable.free_ip_lock = True
global_variable.app.logger.debug('Calculating free ips...')
networks = {}
for ip in list(itertools.chain(*[ip_ for ip_ in global_variable.used_ips])):
net = '{network}.0/24'.format(network='.'.join(ip.split('.')[0:3]))
if net not in networks.keys():
networks[net] = IPSet([net])
if IPAddress(ip) in networks[net]:
networks[net].remove(IPAddress(ip))
for x in xrange(0, 11):
networks[net].remove(
IPAddress(
'{network}.{last_oct}'.format(network='.'.join(ip.split('.')[0:3]),
last_oct=x)
)
)
global_variable.free_ips = {key: [x for x in value] for key, value in networks.items()}
global_variable.free_ip_lock = False
return True
def getRadarAs(asNumber):
radarResponse = requests.get("https://radar.qrator.net/api/prefixes/%s?tab_id=current&page=1" % asNumber).json()
totalPrefixes = int(radarResponse.get('total'))
initalPageSoup = bs4.BeautifulSoup(radarResponse.get('page'), "html.parser")
networkRawSet = set()
for a in initalPageSoup.find_all(text=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d+?$")):
networkRawSet.add("%s" % a)
startPage = 1
while len(networkRawSet) < totalPrefixes:
radarResponse = requests.get("https://radar.qrator.net/api/prefixes/%s?tab_id=current&page=%s" % (asNumber, startPage)).json()
pageSoup = bs4.BeautifulSoup(radarResponse.get('page'), "html.parser")
for a in pageSoup.find_all(text=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d+?$")):
networkRawSet.add("%s" % a)
startPage += 1
# Now minimize this shit
networkSet = netaddr.IPSet([netaddr.IPNetwork(item) for item in networkRawSet])
mergedNetworks = netaddr.cidr_merge(networkSet)
if not mergedNetworks:
print("Nothing found. Wrong AS number?")
else:
print("\n".join(["%s" % network for network in mergedNetworks]))
def get_available_ips(self):
"""
Return all available IPs within this prefix as an IPSet.
"""
prefix = netaddr.IPSet(self.prefix)
child_ips = netaddr.IPSet([ip.address.ip for ip in self.get_child_ips()])
available_ips = prefix - child_ips
# Remove unusable IPs from non-pool prefixes
if not self.is_pool:
available_ips -= netaddr.IPSet([
netaddr.IPAddress(self.prefix.first),
netaddr.IPAddress(self.prefix.last),
])
return available_ips
test_ip_sets.py 文件源码
项目:aCloudGuru-Event-Driven-Security
作者: mikegchambers
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_ipset_unions_intersections_differences():
adj_cidrs = list(IPNetwork('192.0.2.0/24').subnet(28))
even_cidrs = adj_cidrs[::2]
evens = IPSet(even_cidrs)
assert evens == IPSet([
'192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28',
'192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28',
'192.0.2.192/28', '192.0.2.224/28',
])
assert IPSet(['192.0.2.0/24']) & evens == IPSet([
'192.0.2.0/28', '192.0.2.32/28', '192.0.2.64/28',
'192.0.2.96/28', '192.0.2.128/28', '192.0.2.160/28',
'192.0.2.192/28', '192.0.2.224/28'])
odds = IPSet(['192.0.2.0/24']) ^ evens
assert odds == IPSet([
'192.0.2.16/28', '192.0.2.48/28', '192.0.2.80/28',
'192.0.2.112/28', '192.0.2.144/28', '192.0.2.176/28',
'192.0.2.208/28', '192.0.2.240/28'])
assert evens | odds == IPSet(['192.0.2.0/24'])
assert evens & odds == IPSet([])
assert evens ^ odds == IPSet(['192.0.2.0/24'])
test_ip_sets.py 文件源码
项目:aCloudGuru-Event-Driven-Security
作者: mikegchambers
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def test_combined_ipv4_and_ipv6_ipsets():
s1 = IPSet(['192.0.2.0', '::192.0.2.0', '192.0.2.2', '::192.0.2.2'])
s2 = IPSet(['192.0.2.2', '::192.0.2.2', '192.0.2.4', '::192.0.2.4'])
assert s1 | s2 == IPSet([
'192.0.2.0/32', '192.0.2.2/32', '192.0.2.4/32',
'::192.0.2.0/128', '::192.0.2.2/128', '::192.0.2.4/128',
])
assert s2 | s1 == IPSet([
'192.0.2.0/32', '192.0.2.2/32', '192.0.2.4/32',
'::192.0.2.0/128', '::192.0.2.2/128', '::192.0.2.4/128',
])
assert s1 & s2 == IPSet(['192.0.2.2/32', '::192.0.2.2/128'])
assert s1 - s2 == IPSet(['192.0.2.0/32', '::192.0.2.0/128'])
assert s2 - s1 == IPSet(['192.0.2.4/32', '::192.0.2.4/128'])
assert s1 ^ s2 == IPSet(['192.0.2.0/32', '192.0.2.4/32', '::192.0.2.0/128', '::192.0.2.4/128'])
def tags_for_hostname(hostname, mapping):
logging.debug("Hostname is {}".format(hostname))
if not hostname.startswith('ip-'):
return {}
octets = hostname.lstrip('ip-').split('-')
tags = {}
# Update with env and deployment info
tags.update(mapping['CIDR_SECOND_OCTET'][octets[1]])
ip_addr = netaddr.IPAddress(".".join(octets))
for key, value in mapping['CIDR_REST'].items():
cidr = ".".join([
mapping['CIDR_FIRST_OCTET'],
octets[1],
key])
cidrset = netaddr.IPSet([cidr])
if ip_addr in cidrset:
tags.update(value)
return tags
def _generate_ranges(self, cidr, gateway_ip):
"""Create list of ranges from the given cidr.
Ignore the gateway_ip, if defined
"""
ip_set = netaddr.IPSet(netaddr.IPNetwork(cidr))
if gateway_ip:
ip_set.remove(gateway_ip)
return [{"start": str(r[0]),
"end": str(r[-1])} for r in ip_set.iter_ipranges()]
def test_ipset_basic_api():
range1 = IPRange('192.0.2.1', '192.0.2.15')
ip_list = [
IPAddress('192.0.2.1'),
'192.0.2.2/31',
IPNetwork('192.0.2.4/31'),
IPAddress('192.0.2.6'),
IPAddress('192.0.2.7'),
'192.0.2.8',
'192.0.2.9',
IPAddress('192.0.2.10'),
IPAddress('192.0.2.11'),
IPNetwork('192.0.2.12/30'),
]
set1 = IPSet(range1.cidrs())
set2 = IPSet(ip_list)
assert set2 == IPSet([
'192.0.2.1/32',
'192.0.2.2/31',
'192.0.2.4/30',
'192.0.2.8/29',
])
assert set1 == set2
assert set2.pop() in set1
assert set1 != set2
def test_ipset_empty():
assert IPSet() == IPSet([])
empty_set = IPSet([])
assert IPSet([]) == empty_set
assert len(empty_set) == 0
def test_ipset_constructor():
assert IPSet(['192.0.2.0']) == IPSet(['192.0.2.0/32'])
assert IPSet([IPAddress('192.0.2.0')]) == IPSet(['192.0.2.0/32'])
assert IPSet([IPNetwork('192.0.2.0')]) == IPSet(['192.0.2.0/32'])
assert IPSet(IPNetwork('1234::/32')) == IPSet(['1234::/32'])
assert IPSet([IPNetwork('192.0.2.0/24')]) == IPSet(['192.0.2.0/24'])
assert IPSet(IPSet(['192.0.2.0/32'])) == IPSet(['192.0.2.0/32'])
assert IPSet(IPRange("10.0.0.0", "10.0.1.31")) == IPSet(['10.0.0.0/24', '10.0.1.0/27'])
assert IPSet(IPRange('0.0.0.0', '255.255.255.255')) == IPSet(['0.0.0.0/0'])
def test_ipset_iteration():
assert list(IPSet(['192.0.2.0/28', '::192.0.2.0/124'])) == [
IPAddress('192.0.2.0'),
IPAddress('192.0.2.1'),
IPAddress('192.0.2.2'),
IPAddress('192.0.2.3'),
IPAddress('192.0.2.4'),
IPAddress('192.0.2.5'),
IPAddress('192.0.2.6'),
IPAddress('192.0.2.7'),
IPAddress('192.0.2.8'),
IPAddress('192.0.2.9'),
IPAddress('192.0.2.10'),
IPAddress('192.0.2.11'),
IPAddress('192.0.2.12'),
IPAddress('192.0.2.13'),
IPAddress('192.0.2.14'),
IPAddress('192.0.2.15'),
IPAddress('::192.0.2.0'),
IPAddress('::192.0.2.1'),
IPAddress('::192.0.2.2'),
IPAddress('::192.0.2.3'),
IPAddress('::192.0.2.4'),
IPAddress('::192.0.2.5'),
IPAddress('::192.0.2.6'),
IPAddress('::192.0.2.7'),
IPAddress('::192.0.2.8'),
IPAddress('::192.0.2.9'),
IPAddress('::192.0.2.10'),
IPAddress('::192.0.2.11'),
IPAddress('::192.0.2.12'),
IPAddress('::192.0.2.13'),
IPAddress('::192.0.2.14'),
IPAddress('::192.0.2.15'),
]
def test_ipset_member_insertion_and_deletion():
s1 = IPSet()
s1.add('192.0.2.0')
assert s1 == IPSet(['192.0.2.0/32'])
s1.remove('192.0.2.0')
assert s1 == IPSet([])
s1.add(IPRange("10.0.0.0", "10.0.0.255"))
assert s1 == IPSet(['10.0.0.0/24'])
s1.remove(IPRange("10.0.0.128", "10.10.10.10"))
assert s1 == IPSet(['10.0.0.0/25'])
def test_ipset_membership_largest():
ipset = IPSet(['0.0.0.0/0'])
assert IPAddress("10.0.0.1") in ipset
assert IPAddress("0.0.0.0") in ipset
assert IPAddress("255.255.255") in ipset
assert IPNetwork("10.0.0.0/24") in ipset
assert IPAddress("::1") not in ipset
def test_set_membership_smallest():
ipset = IPSet(["10.0.0.42/32"])
assert IPAddress("10.0.0.42") in ipset
assert IPNetwork("10.0.0.42/32") in ipset
assert IPAddress("10.0.0.41") not in ipset
assert IPAddress("10.0.0.43") not in ipset
assert IPNetwork("10.0.0.42/31") not in ipset
def test_ipset_unions():
assert IPSet(['192.0.2.0']) == IPSet(['192.0.2.0/32'])
assert IPSet(['192.0.2.0']) | IPSet(['192.0.2.1']) == IPSet(['192.0.2.0/31'])
assert IPSet(['192.0.2.0']) | IPSet(['192.0.2.1']) | IPSet(['192.0.2.3']) == IPSet(['192.0.2.0/31', '192.0.2.3/32'])
assert IPSet(['192.0.2.0']) | IPSet(['192.0.2.1']) | IPSet(['192.0.2.3/30']) == IPSet(['192.0.2.0/30'])
assert IPSet(['192.0.2.0']) | IPSet(['192.0.2.1']) | IPSet(['192.0.2.3/31']) == IPSet(['192.0.2.0/30'])
assert IPSet(['192.0.2.0/24']) | IPSet(['192.0.3.0/24']) | IPSet(['192.0.4.0/24']) == IPSet(['192.0.2.0/23', '192.0.4.0/24'])
def test_ipset_updates():
s1 = IPSet(['192.0.2.0/25'])
s2 = IPSet(['192.0.2.128/25'])
s1.update(s2)
assert s1 == IPSet(['192.0.2.0/24'])
s1.update(['192.0.0.0/24', '192.0.1.0/24', '192.0.3.0/24'])
assert s1 == IPSet(['192.0.0.0/22'])
def test_ipset_clear():
ipset = IPSet(['10.0.0.0/16'])
ipset.update(IPRange('10.1.0.0', '10.1.255.255'))
assert ipset == IPSet(['10.0.0.0/15'])
ipset.clear()
assert ipset == IPSet([])
def test_ipset_with_iprange():
s1 = IPSet(['10.0.0.0/25', '10.0.0.128/25'])
assert s1.iprange() == IPRange('10.0.0.0', '10.0.0.255')
assert s1.iscontiguous()
s1.remove('10.0.0.16')
assert s1 == IPSet([
'10.0.0.0/28', '10.0.0.17/32', '10.0.0.18/31',
'10.0.0.20/30', '10.0.0.24/29', '10.0.0.32/27',
'10.0.0.64/26', '10.0.0.128/25',
])
assert not s1.iscontiguous()
with pytest.raises(ValueError):
s1.iprange()
assert list(s1.iter_ipranges()) == [
IPRange('10.0.0.0', '10.0.0.15'),
IPRange('10.0.0.17', '10.0.0.255'),
]
s2 = IPSet(['0.0.0.0/0'])
assert s2.iscontiguous()
assert s2.iprange() == IPRange('0.0.0.0', '255.255.255.255')
#
s3 = IPSet()
assert s3.iscontiguous()
assert s3.iprange() is None
s4 = IPSet(IPRange('10.0.0.0', '10.0.0.8'))
assert s4.iscontiguous()
def test_ipset_pickling():
ip_data = IPSet(['10.0.0.0/16', 'fe80::/64'])
buf = pickle.dumps(ip_data)
ip_data_unpickled = pickle.loads(buf)
assert ip_data == ip_data_unpickled
def test_ipset_comparison():
s1 = IPSet(['fc00::/2'])
s2 = IPSet(['fc00::/3'])
assert s1 > s2
assert not s1 < s2
assert s1 != s2