def get_sre(net, target_prefix_len):
"""Calculate first and last /target_prefix_len subnets from net
Returns: first, last, cnt (all integers)
"""
assert isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network))
assert target_prefix_len <= 64, "Max target prefix length is 64"
assert net.prefixlen <= target_prefix_len, \
("Prefix length ({}) must be <= of the target prefix "
"length ({}): {}".format(net.prefixlen, target_prefix_len, net))
first = UniqueSmallestRoutableEntriesMonitor.get_first(net)
tot_len = 64 if net.version == 6 else 32
# first <= 2^63 -1 to avoid overflows
assert first <= 9223372036854775807, \
"Only prefixes <= 7fff:ffff:ffff:ffff::/64 can be processed"
diff_len = target_prefix_len - net.prefixlen
last = first | ((2**diff_len - 1) << tot_len - target_prefix_len)
return first, last, 2**diff_len
python类IPv6Network()的实例源码
def get_net(net):
if isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network)):
return net
return ipaddr.IPNetwork(net)
def get_first(net):
assert isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network))
if net.version == 6:
return int(net.network) >> 64
else:
return int(net.network)
def default(self, obj):
if isinstance(obj,
(ipaddress.IPv4Network, ipaddress.IPv6Network, ipaddress.IPv4Address, ipaddress.IPv6Address)):
return str(obj)
return json.JSONEncoder.default(self, obj)
def __init__(self):
self.ipv4_networks = []
for ip in WHATSAPP_IPV4.split("\n"):
try:
self.ipv4_networks.append(ipaddr.IPv4Network(ip))
except Exception:
log.err("IP is wrong")
log.msg(ip)
self.ipv6_networks = map(ipaddr.IPv6Network,
WHATSAPP_IPV6.split("\n"))
def ipv6_link_mcast_from_ucast(ucast):
link_mcast_prefix = ipaddr.IPv6Network('ff02::1:ff00:0/104')
mcast_bytes = ipaddr.Bytes(
link_mcast_prefix.packed[:13] + ucast.packed[-3:])
link_mcast = ipaddr.IPv6Address(mcast_bytes)
return link_mcast
def default(self, obj):
if isinstance(obj, (
ipaddress.IPv4Network, ipaddress.IPv6Network,
ipaddress.IPv4Address, ipaddress.IPv6Address
)):
return str(obj)
return json.JSONEncoder.default(self, obj)
def get(self):
esstore = current_app.extensions['es_access_store']
mpstore = current_app.extensions['mp_access_store']
args = self.parser.parse_args()
event = args['event'][:120]
auth_token=request.headers.get('Auth-Token')
ip_resolver = current_app.config['IP_RESOLVER']
ip = RateLimiter.get_remote_addr()
ip_net = IPNetwork(ip)
resolved_org = ip_resolver['default']
for net, org in ip_resolver.items():
if isinstance(net, (IPv4Network, IPv6Network)):
if net.overlaps(ip_net):
resolved_org = org
break
data = dict(ip=ip,
org=resolved_org,
host=request.host,
timestamp=datetime.now(),
event=event)
if auth_token:
payload = TokenAuthentication.get_payload_from_token(auth_token)
data['app_name'] = payload['app_name']
# esstore.store_event(data)
mpstore.store_event(data)
data['timestamp']= str(data['timestamp'])
return CTTVResponse.OK(SimpleResult(None, data=data))
def add_random_net(ip_ver, target_prefix_len):
def dump_vars():
print("target_prefix_len", target_prefix_len)
print("prefix_len", prefix_len)
print("max_prefix_len", max_prefix_len)
print("max_range_len", max_range_len)
print("max_range", max_range)
print("rand", rand)
print("net_id", net_id)
print("ip", ip)
print("net", net)
print("str(net.ip)", str(net.ip))
print("str(net.network)", str(net.network))
if ip_ver == 4:
prefix_len = random.randint(8, target_prefix_len-1)
max_range_len = prefix_len
max_prefix_len = 32
ip_class = ipaddr.IPv4Address
net_class = ipaddr.IPv4Network
else:
prefix_len = random.randint(19, target_prefix_len-1)
max_range_len = prefix_len - 1 # to avoid overflow
max_prefix_len = 64
ip_class = ipaddr.IPv6Address
net_class = ipaddr.IPv6Network
max_range = 2**max_range_len - 1
rand = random.randint(0, max_range)
net_id = rand << max_prefix_len - prefix_len
if ip_ver == 6:
net_id = net_id << 64
ip = ip_class(net_id)
net = net_class("{}/{}".format(str(ip), prefix_len))
try:
assert str(net.ip) == str(net.network)
except:
dump_vars()
raise
try:
usres_monitor.add_net(net)
except USRESMonitorException as e:
if "it was already in the db" in str(e):
return "dup", net
except:
dump_vars()
return "ok", net