def get_sre(net, target_prefix_len):
"""Calculate first and last /target_prefix_len subnets from net
Returns: first, last, cnt (all integers)
"""
assert isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network))
assert target_prefix_len <= 64, "Max target prefix length is 64"
assert net.prefixlen <= target_prefix_len, \
("Prefix length ({}) must be <= of the target prefix "
"length ({}): {}".format(net.prefixlen, target_prefix_len, net))
first = UniqueSmallestRoutableEntriesMonitor.get_first(net)
tot_len = 64 if net.version == 6 else 32
# first <= 2^63 -1 to avoid overflows
assert first <= 9223372036854775807, \
"Only prefixes <= 7fff:ffff:ffff:ffff::/64 can be processed"
diff_len = target_prefix_len - net.prefixlen
last = first | ((2**diff_len - 1) << tot_len - target_prefix_len)
return first, last, 2**diff_len
python类IPv4Network()的实例源码
def get_free_ip(subnets):
for subnet in subnets:
reserved = subnet.reserved_addresses
net = ipaddr.IPv4Network(subnet.address)
used_ips = [ip.address for ip in subnet.ips]
if (net.numhosts -2 - reserved) <= len(used_ips):
continue
for ip in net.iterhosts():
if reserved > 0:
reserved -= 1
continue
if ip.compressed not in used_ips:
return ip.compressed
raise SubnetFullException()
def fetch_url(self, i, fn_on_response):
item = self.get_next_task()
while item is not None:
try:
if '/' in item:
mask = ipaddr.IPv4Network(item)
ip_list = [text_type(t) for t in mask.iterhosts()]
else:
ip_list = [item]
except:
ip_list = []
for t in ip_list:
if t == '':
continue
url_list = ['http://%s:%s' % (t, p) for p in self.port_list]
url_list.extend(['https://%s:%s' % (t, p) for p in [443, 8443]])
for u in url_list:
yield self.do_request(u, 'GET', fn_on_response)
item = self.get_next_task()
def on_queue_empty(self, queue, max_num=100):
for _ in range(max_num):
try:
item = self.list_data.popleft()
if '/' in item:
mask = ipaddr.IPv4Network(item)
ip_list = [text_type(t) for t in mask.iterhosts()]
else:
ip_list = [item]
queue.extend(ip_list)
except IndexError:
break
try:
item = queue.popleft()
except IndexError:
item = None
return item
def is_reserved(ip):
return (ipaddr.IPv4Network(ip).is_multicast |
ipaddr.IPv4Network(ip).is_private |
ipaddr.IPv4Network(ip).is_link_local |
ipaddr.IPv4Network(ip).is_loopback |
ipaddr.IPv4Network(ip).is_reserved)
def get_net(net):
if isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network)):
return net
return ipaddr.IPNetwork(net)
def get_first(net):
assert isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network))
if net.version == 6:
return int(net.network) >> 64
else:
return int(net.network)
def ipListBuild(address):
print '1. Single IP Covert For En\n2. Build IP List'
opt_req = raw_input("[+] [1 By Default/2]") or '1'
if opt_req == '1':
print numToEnToNum(address)
exit()
conf_main = conf_read('maindomain')[:-1]
seg_len = raw_input("[+] Please Input Segment Length [24 By Default]") or 24
encode_req = raw_input("[+] Please Input Encoding ['ipv4' By Default]")
mainDomain = raw_input("[+] Please Input Server Root Address [{} By Default]".format(conf_main)) or conf_main
segment = eval("ipaddr.IPv4Network('{}/{}').iterhosts()".format(address, int(seg_len)))
save_file = "{}_{}_{}.txt".format(time.strftime("%Y%m%d%X", time.localtime()).replace(':', ''), mainDomain.replace('.','_'),(encode_req if encode_req else 'ipv4'))
results = []
try:
if encode_req == '': results += ["{}.{}".format(str(i),mainDomain) for i in list(segment)]
elif encode_req == 'en':
results += ["{}.{}".format(numToEnToNum(str(i)),mainDomain) for i in list(segment)]
elif encode_req == 'int':
results += ["{}.{}".format(int(ipaddr.IPAddress(str(i))),mainDomain) for i in list(segment)]
elif encode_req == 'hex':
results += ["{}.{}".format(str(i).encode('hex'),mainDomain) for i in list(segment)]
else:
pass
f = open(save_file,'a')
[f.write(i+'\n') for i in results]
f.close()
print '[+] Stored in the {}'.format(save_file)
except Exception,e:
print e
exit()
def default(self, obj):
if isinstance(obj,
(ipaddress.IPv4Network, ipaddress.IPv6Network, ipaddress.IPv4Address, ipaddress.IPv6Address)):
return str(obj)
return json.JSONEncoder.default(self, obj)
def main():
targetlist=[]
if len(argv)>2:
if argv[1]=='-f':
return extract_target(argv[2])
if argv[1]=='-i':
port=6379
if len(argv)==5:
port=int(argv[4])
targets = ipaddr.IPv4Network(argv[2])
for tar in targets:
targetlist.append("%s:%d"%(tar,port))
return targetlist
def select_subnet_for_ip(ip, subnets):
for subnet in subnets:
if ipaddr.IPAddress(ip) in ipaddr.IPv4Network(subnet.address):
return subnet
def __init__(self):
self.ipv4_networks = []
for ip in WHATSAPP_IPV4.split("\n"):
try:
self.ipv4_networks.append(ipaddr.IPv4Network(ip))
except Exception:
log.err("IP is wrong")
log.msg(ip)
self.ipv6_networks = map(ipaddr.IPv6Network,
WHATSAPP_IPV6.split("\n"))
def main():
"""main function"""
global timeout
opt = parse_options()
scan_network = opt.network
try:
timeout = float(opt.timeout)
except ValueError:
timeout = 0.5
print("Use default timeout {}".format(timeout))
hosts = []
if '/' in scan_network:
network = IPv4Network(scan_network)
for host in network.iterhosts():
hosts.append(str(host))
print('start to scan network {} for {} hosts...'.format(str(network), len(hosts)))
pool = multiprocessing.Pool(processes=16)
pool.map(scan, hosts)
pool.close()
pool.join()
else:
print('start to scan host {}'.format(scan_network))
scan(scan_network)
print("done")
def default(self, obj):
if isinstance(obj, (
ipaddress.IPv4Network, ipaddress.IPv6Network,
ipaddress.IPv4Address, ipaddress.IPv6Address
)):
return str(obj)
return json.JSONEncoder.default(self, obj)
def get(self):
esstore = current_app.extensions['es_access_store']
mpstore = current_app.extensions['mp_access_store']
args = self.parser.parse_args()
event = args['event'][:120]
auth_token=request.headers.get('Auth-Token')
ip_resolver = current_app.config['IP_RESOLVER']
ip = RateLimiter.get_remote_addr()
ip_net = IPNetwork(ip)
resolved_org = ip_resolver['default']
for net, org in ip_resolver.items():
if isinstance(net, (IPv4Network, IPv6Network)):
if net.overlaps(ip_net):
resolved_org = org
break
data = dict(ip=ip,
org=resolved_org,
host=request.host,
timestamp=datetime.now(),
event=event)
if auth_token:
payload = TokenAuthentication.get_payload_from_token(auth_token)
data['app_name'] = payload['app_name']
# esstore.store_event(data)
mpstore.store_event(data)
data['timestamp']= str(data['timestamp'])
return CTTVResponse.OK(SimpleResult(None, data=data))
def add_random_net(ip_ver, target_prefix_len):
def dump_vars():
print("target_prefix_len", target_prefix_len)
print("prefix_len", prefix_len)
print("max_prefix_len", max_prefix_len)
print("max_range_len", max_range_len)
print("max_range", max_range)
print("rand", rand)
print("net_id", net_id)
print("ip", ip)
print("net", net)
print("str(net.ip)", str(net.ip))
print("str(net.network)", str(net.network))
if ip_ver == 4:
prefix_len = random.randint(8, target_prefix_len-1)
max_range_len = prefix_len
max_prefix_len = 32
ip_class = ipaddr.IPv4Address
net_class = ipaddr.IPv4Network
else:
prefix_len = random.randint(19, target_prefix_len-1)
max_range_len = prefix_len - 1 # to avoid overflow
max_prefix_len = 64
ip_class = ipaddr.IPv6Address
net_class = ipaddr.IPv6Network
max_range = 2**max_range_len - 1
rand = random.randint(0, max_range)
net_id = rand << max_prefix_len - prefix_len
if ip_ver == 6:
net_id = net_id << 64
ip = ip_class(net_id)
net = net_class("{}/{}".format(str(ip), prefix_len))
try:
assert str(net.ip) == str(net.network)
except:
dump_vars()
raise
try:
usres_monitor.add_net(net)
except USRESMonitorException as e:
if "it was already in the db" in str(e):
return "dup", net
except:
dump_vars()
return "ok", net