def test_ipnetwork_cidr_merge():
ip_list = (
list(IPNetwork('fe80::/120')) +
[
IPNetwork('192.0.2.0/24'),
IPNetwork('192.0.4.0/25'),
IPNetwork('192.0.4.128/25'),
]
+ list(map(str, IPNetwork('192.0.3.0/24')))
)
assert len(ip_list) == 515
assert cidr_merge(ip_list) == [
IPNetwork('192.0.2.0/23'),
IPNetwork('192.0.4.0/24'),
IPNetwork('fe80::/120'),
]
python类cidr_merge()的实例源码
def test_ipnetwork_cidr_merge():
ip_list = (
list(IPNetwork('fe80::/120')) +
[
IPNetwork('192.0.2.0/24'),
IPNetwork('192.0.4.0/25'),
IPNetwork('192.0.4.128/25'),
]
+ list(map(str, IPNetwork('192.0.3.0/24')))
)
assert len(ip_list) == 515
assert cidr_merge(ip_list) == [
IPNetwork('192.0.2.0/23'),
IPNetwork('192.0.4.0/24'),
IPNetwork('fe80::/120'),
]
def getRadarAs(asNumber):
radarResponse = requests.get("https://radar.qrator.net/api/prefixes/%s?tab_id=current&page=1" % asNumber).json()
totalPrefixes = int(radarResponse.get('total'))
initalPageSoup = bs4.BeautifulSoup(radarResponse.get('page'), "html.parser")
networkRawSet = set()
for a in initalPageSoup.find_all(text=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d+?$")):
networkRawSet.add("%s" % a)
startPage = 1
while len(networkRawSet) < totalPrefixes:
radarResponse = requests.get("https://radar.qrator.net/api/prefixes/%s?tab_id=current&page=%s" % (asNumber, startPage)).json()
pageSoup = bs4.BeautifulSoup(radarResponse.get('page'), "html.parser")
for a in pageSoup.find_all(text=re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d+?$")):
networkRawSet.add("%s" % a)
startPage += 1
# Now minimize this shit
networkSet = netaddr.IPSet([netaddr.IPNetwork(item) for item in networkRawSet])
mergedNetworks = netaddr.cidr_merge(networkSet)
if not mergedNetworks:
print("Nothing found. Wrong AS number?")
else:
print("\n".join(["%s" % network for network in mergedNetworks]))
test_network_ops.py 文件源码
项目:aCloudGuru-Event-Driven-Security
作者: mikegchambers
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_ipnetwork_cidr_merge():
ip_list = (
list(IPNetwork('fe80::/120')) +
[
IPNetwork('192.0.2.0/24'),
IPNetwork('192.0.4.0/25'),
IPNetwork('192.0.4.128/25'),
]
+ list(map(str, IPNetwork('192.0.3.0/24')))
)
assert len(ip_list) == 515
assert cidr_merge(ip_list) == [
IPNetwork('192.0.2.0/23'),
IPNetwork('192.0.4.0/24'),
IPNetwork('fe80::/120'),
]
def test_whole_network_cidr_merge_v6():
assert cidr_merge(['::/0', 'fe80::1']) == [IPNetwork('::/0')]
assert cidr_merge(['::/0', '::']) == [IPNetwork('::/0')]
assert cidr_merge(['::/0', '::192.0.2.0/124', 'ff00::101']) == [IPNetwork('::/0')]
assert cidr_merge(['0.0.0.0/0', '0.0.0.0', '::/0', '::']) == [IPNetwork('0.0.0.0/0'), IPNetwork('::/0')]
def test_ip_range():
ip_list = list(iter_iprange('192.0.2.1', '192.0.2.14'))
assert len(ip_list) == 14
assert ip_list == [
IPAddress('192.0.2.1'),
IPAddress('192.0.2.2'),
IPAddress('192.0.2.3'),
IPAddress('192.0.2.4'),
IPAddress('192.0.2.5'),
IPAddress('192.0.2.6'),
IPAddress('192.0.2.7'),
IPAddress('192.0.2.8'),
IPAddress('192.0.2.9'),
IPAddress('192.0.2.10'),
IPAddress('192.0.2.11'),
IPAddress('192.0.2.12'),
IPAddress('192.0.2.13'),
IPAddress('192.0.2.14'),
]
assert cidr_merge(ip_list) == [
IPNetwork('192.0.2.1/32'),
IPNetwork('192.0.2.2/31'),
IPNetwork('192.0.2.4/30'),
IPNetwork('192.0.2.8/30'),
IPNetwork('192.0.2.12/31'),
IPNetwork('192.0.2.14/32'),
]
def func(self, args):
"""Add networks together, aggregating as much as possible.
Uses self._get_networks() to get the networks. Subclasses may
want to redefine that method.
"""
networks = self._get_networks(args)
merged = netaddr.cidr_merge(networks)
for i in merged:
print(i)
def func(self, args):
"""Evaluate an expression of adding and subtracting networks."""
expr = args.expression
accum = [_network_address(expr.pop(0))]
while len(expr) >= 2:
operator = expr.pop(0)
# right-hand side of the expression
rhs = _network_address(expr.pop(0))
if operator in ("+", "add", "merge"):
# add (merge) in a new network
accum = netaddr.cidr_merge(accum + [rhs])
elif operator in ("-", "sub", "remove"):
# subtract (remove) a network
# for each network in accum, remove the RHS from it, then
# chain everything into a single flat generator sequence
# (RHS may be partially contained in more than one accum
# network)
minus_rhs = itertools.chain.from_iterable(
netaddr.cidr_exclude(network, rhs)
for network in accum
)
accum = netaddr.cidr_merge(minus_rhs)
else:
raise CommandParseError("invalid operator '%s'" % operator)
if expr:
self.warn("ignoring extra argument '%s'" % ' '.join(expr))
for i in accum:
print(i)
def unfold(objarr):
for obj in objarr:
unfold_rec(objarr[obj],objarr)
if not args.noaggr and objarr is netgrp: objarr[obj] = netaddr.cidr_merge(objarr[obj])
# Unfold all included objects
def test_whole_network_cidr_merge_v6():
assert cidr_merge(['::/0', 'fe80::1']) == [IPNetwork('::/0')]
assert cidr_merge(['::/0', '::']) == [IPNetwork('::/0')]
assert cidr_merge(['::/0', '::192.0.2.0/124', 'ff00::101']) == [IPNetwork('::/0')]
assert cidr_merge(['0.0.0.0/0', '0.0.0.0', '::/0', '::']) == [IPNetwork('0.0.0.0/0'), IPNetwork('::/0')]
def test_ip_range():
ip_list = list(iter_iprange('192.0.2.1', '192.0.2.14'))
assert len(ip_list) == 14
assert ip_list == [
IPAddress('192.0.2.1'),
IPAddress('192.0.2.2'),
IPAddress('192.0.2.3'),
IPAddress('192.0.2.4'),
IPAddress('192.0.2.5'),
IPAddress('192.0.2.6'),
IPAddress('192.0.2.7'),
IPAddress('192.0.2.8'),
IPAddress('192.0.2.9'),
IPAddress('192.0.2.10'),
IPAddress('192.0.2.11'),
IPAddress('192.0.2.12'),
IPAddress('192.0.2.13'),
IPAddress('192.0.2.14'),
]
assert cidr_merge(ip_list) == [
IPNetwork('192.0.2.1/32'),
IPNetwork('192.0.2.2/31'),
IPNetwork('192.0.2.4/30'),
IPNetwork('192.0.2.8/30'),
IPNetwork('192.0.2.12/31'),
IPNetwork('192.0.2.14/32'),
]
test_cidr_v6.py 文件源码
项目:aCloudGuru-Event-Driven-Security
作者: mikegchambers
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_whole_network_cidr_merge_v6():
assert cidr_merge(['::/0', 'fe80::1']) == [IPNetwork('::/0')]
assert cidr_merge(['::/0', '::']) == [IPNetwork('::/0')]
assert cidr_merge(['::/0', '::192.0.2.0/124', 'ff00::101']) == [IPNetwork('::/0')]
assert cidr_merge(['0.0.0.0/0', '0.0.0.0', '::/0', '::']) == [IPNetwork('0.0.0.0/0'), IPNetwork('::/0')]
test_ip_ranges.py 文件源码
项目:aCloudGuru-Event-Driven-Security
作者: mikegchambers
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_ip_range():
ip_list = list(iter_iprange('192.0.2.1', '192.0.2.14'))
assert len(ip_list) == 14
assert ip_list == [
IPAddress('192.0.2.1'),
IPAddress('192.0.2.2'),
IPAddress('192.0.2.3'),
IPAddress('192.0.2.4'),
IPAddress('192.0.2.5'),
IPAddress('192.0.2.6'),
IPAddress('192.0.2.7'),
IPAddress('192.0.2.8'),
IPAddress('192.0.2.9'),
IPAddress('192.0.2.10'),
IPAddress('192.0.2.11'),
IPAddress('192.0.2.12'),
IPAddress('192.0.2.13'),
IPAddress('192.0.2.14'),
]
assert cidr_merge(ip_list) == [
IPNetwork('192.0.2.1/32'),
IPNetwork('192.0.2.2/31'),
IPNetwork('192.0.2.4/30'),
IPNetwork('192.0.2.8/30'),
IPNetwork('192.0.2.12/31'),
IPNetwork('192.0.2.14/32'),
]
def get_scanner_targets(self):
"""
Get a list of strings representing the IP addresses and network ranges that
this scanner is configured to scan.
:return: A list of strings representing the IP addresses and network ranges that
this scanner is configured to scan.
"""
all_ips = self.get_all_ips()
merged = cidr_merge(all_ips)
return [str(x) for x in merged]
def group_nets(nets):
# nets = { src: [dst1, dst2, ...], ...}
revnets = {} # nets reversed: { dst: [src1, src2, ...], ... }
# next iteration
debug("group_nets -- Begin ====================",4)
debug("group_nets -- Before first phase of grouping (nets)",4)
debug(nets,4)
for src in nets:
debug("group_nets -- The source",5)
debug(src,5)
debug("group_nets -- 1F The destination",5)
debug(nets[src],5)
nets[src] = netaddr.cidr_merge(nets[src])
debug("group_nets -- 1F After CIDR-merge",5)
debug(nets[src],5)
if len(nets) == 1:
debug("group_nets -- Only one pair",4)
return {(src,):nets[src]}
for dst in nets[src]:
debug("group_nets -- For the destination",5)
debug(dst,5)
if dst not in revnets:
revnets[dst] = []
if src not in revnets[dst]:
revnets[dst].append(src)
debug("group_nets -- Added the following source",5)
debug(src,5)
debug("group_nets -- Current revnets[dst]",5)
debug(revnets[dst],5)
# grouping
debug("group_nets -- The result of the first phase of grouping (revnets)",4)
debug(revnets,4)
debug("group_nets -- Second phase of grouping",4)
nets = {}
for dst in revnets:
debug("group_nets -- 2F The destination",5)
debug(dst,5)
debug("group_nets -- The corresponfing sources",5)
debug(revnets[dst],5)
revnets[dst] = netaddr.cidr_merge(revnets[dst])
debug("group_nets -- 2F After CIDR-merge",5)
debug(revnets[dst],5)
add_net_pair(tuple(revnets[dst]),dst,nets)
debug("group_nets -- The result of grouping (nets)",4)
debug(nets,4)
debug("group_nets -- End ====================",4)
return nets
def alter_queryset(self, request):
if request.GET.get('family') == '6':
family = 6
denominator = 2 ** 64 # Count /64s for IPv6 rather than individual IPs
else:
family = 4
denominator = 1
rirs = []
for rir in self.queryset:
stats = {
'total': 0,
'active': 0,
'reserved': 0,
'deprecated': 0,
'available': 0,
}
aggregate_list = Aggregate.objects.filter(family=family, rir=rir)
for aggregate in aggregate_list:
queryset = Prefix.objects.filter(prefix__net_contained_or_equal=str(aggregate.prefix))
# Find all consumed space for each prefix status (we ignore containers for this purpose).
active_prefixes = netaddr.cidr_merge([p.prefix for p in queryset.filter(status=PREFIX_STATUS_ACTIVE)])
reserved_prefixes = netaddr.cidr_merge([p.prefix for p in queryset.filter(status=PREFIX_STATUS_RESERVED)])
deprecated_prefixes = netaddr.cidr_merge([p.prefix for p in queryset.filter(status=PREFIX_STATUS_DEPRECATED)])
# Find all available prefixes by subtracting each of the existing prefix sets from the aggregate prefix.
available_prefixes = (
netaddr.IPSet([aggregate.prefix]) -
netaddr.IPSet(active_prefixes) -
netaddr.IPSet(reserved_prefixes) -
netaddr.IPSet(deprecated_prefixes)
)
# Add the size of each metric to the RIR total.
stats['total'] += aggregate.prefix.size / denominator
stats['active'] += netaddr.IPSet(active_prefixes).size / denominator
stats['reserved'] += netaddr.IPSet(reserved_prefixes).size / denominator
stats['deprecated'] += netaddr.IPSet(deprecated_prefixes).size / denominator
stats['available'] += available_prefixes.size / denominator
# Calculate the percentage of total space for each prefix status.
total = float(stats['total'])
stats['percentages'] = {
'active': float('{:.2f}'.format(stats['active'] / total * 100)) if total else 0,
'reserved': float('{:.2f}'.format(stats['reserved'] / total * 100)) if total else 0,
'deprecated': float('{:.2f}'.format(stats['deprecated'] / total * 100)) if total else 0,
}
stats['percentages']['available'] = (
100 -
stats['percentages']['active'] -
stats['percentages']['reserved'] -
stats['percentages']['deprecated']
)
rir.stats = stats
rirs.append(rir)
return rirs