def load_network_ips(inventory):
networks = inventory['networks']
available_network_ips = {}
for net_name, net in networks.iteritems():
if 'available-ips' in net:
ip_list_raw = net.get('available-ips')
ip_list_out = []
for ip in ip_list_raw:
if ' ' in ip:
ip_range = ip.split()
for _ip in netaddr.iter_iprange(ip_range[0], ip_range[1]):
ip_list_out.append(_ip)
else:
ip_list_out.append(ip)
available_network_ips[net_name] = ip_list_out
return available_network_ips
python类iter_iprange()的实例源码
allocate_ip_addresses.py 文件源码
项目:cluster-genesis
作者: open-power-ref-design-toolkit
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_iprange_boundaries():
assert list(iter_iprange('192.0.2.0', '192.0.2.7')) == [
IPAddress('192.0.2.0'),
IPAddress('192.0.2.1'),
IPAddress('192.0.2.2'),
IPAddress('192.0.2.3'),
IPAddress('192.0.2.4'),
IPAddress('192.0.2.5'),
IPAddress('192.0.2.6'),
IPAddress('192.0.2.7'),
]
assert list(iter_iprange('::ffff:192.0.2.0', '::ffff:192.0.2.7')) == [
IPAddress('::ffff:192.0.2.0'),
IPAddress('::ffff:192.0.2.1'),
IPAddress('::ffff:192.0.2.2'),
IPAddress('::ffff:192.0.2.3'),
IPAddress('::ffff:192.0.2.4'),
IPAddress('::ffff:192.0.2.5'),
IPAddress('::ffff:192.0.2.6'),
IPAddress('::ffff:192.0.2.7'),
]
def test_iprange_boundaries():
assert list(iter_iprange('192.0.2.0', '192.0.2.7')) == [
IPAddress('192.0.2.0'),
IPAddress('192.0.2.1'),
IPAddress('192.0.2.2'),
IPAddress('192.0.2.3'),
IPAddress('192.0.2.4'),
IPAddress('192.0.2.5'),
IPAddress('192.0.2.6'),
IPAddress('192.0.2.7'),
]
assert list(iter_iprange('::ffff:192.0.2.0', '::ffff:192.0.2.7')) == [
IPAddress('::ffff:192.0.2.0'),
IPAddress('::ffff:192.0.2.1'),
IPAddress('::ffff:192.0.2.2'),
IPAddress('::ffff:192.0.2.3'),
IPAddress('::ffff:192.0.2.4'),
IPAddress('::ffff:192.0.2.5'),
IPAddress('::ffff:192.0.2.6'),
IPAddress('::ffff:192.0.2.7'),
]
test_ip_ranges.py 文件源码
项目:aCloudGuru-Event-Driven-Security
作者: mikegchambers
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_iprange_boundaries():
assert list(iter_iprange('192.0.2.0', '192.0.2.7')) == [
IPAddress('192.0.2.0'),
IPAddress('192.0.2.1'),
IPAddress('192.0.2.2'),
IPAddress('192.0.2.3'),
IPAddress('192.0.2.4'),
IPAddress('192.0.2.5'),
IPAddress('192.0.2.6'),
IPAddress('192.0.2.7'),
]
assert list(iter_iprange('::ffff:192.0.2.0', '::ffff:192.0.2.7')) == [
IPAddress('::ffff:192.0.2.0'),
IPAddress('::ffff:192.0.2.1'),
IPAddress('::ffff:192.0.2.2'),
IPAddress('::ffff:192.0.2.3'),
IPAddress('::ffff:192.0.2.4'),
IPAddress('::ffff:192.0.2.5'),
IPAddress('::ffff:192.0.2.6'),
IPAddress('::ffff:192.0.2.7'),
]
def get_ip_range(start, end):
generator = iter_iprange(start, end, step=1)
ips = []
while True:
try:
ips.append(str(generator.next()))
except StopIteration:
break
return ips
def test_ip_range():
ip_list = list(iter_iprange('192.0.2.1', '192.0.2.14'))
assert len(ip_list) == 14
assert ip_list == [
IPAddress('192.0.2.1'),
IPAddress('192.0.2.2'),
IPAddress('192.0.2.3'),
IPAddress('192.0.2.4'),
IPAddress('192.0.2.5'),
IPAddress('192.0.2.6'),
IPAddress('192.0.2.7'),
IPAddress('192.0.2.8'),
IPAddress('192.0.2.9'),
IPAddress('192.0.2.10'),
IPAddress('192.0.2.11'),
IPAddress('192.0.2.12'),
IPAddress('192.0.2.13'),
IPAddress('192.0.2.14'),
]
assert cidr_merge(ip_list) == [
IPNetwork('192.0.2.1/32'),
IPNetwork('192.0.2.2/31'),
IPNetwork('192.0.2.4/30'),
IPNetwork('192.0.2.8/30'),
IPNetwork('192.0.2.12/31'),
IPNetwork('192.0.2.14/32'),
]
bgperf.py 文件源码
项目:Python-Network-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def gen_mako_macro():
return '''<%
import netaddr
from itertools import islice
it = netaddr.iter_iprange('100.0.0.0','160.0.0.0')
def gen_paths(num):
return list('{0}/32'.format(ip) for ip in islice(it, num))
%>
'''
def get_ip_range(oms_spec, key):
if key in oms_spec:
start, end = oms_spec[key].split('-')
return iter_iprange(start, end)
def get_subnet(self, start='10.10.1.0', end='10.10.255.0', step=256):
subnet_gen = netaddr.iter_iprange(start, end, step=step)
i = 1
while True:
with self.lock:
try:
yield (i, str(subnet_gen.next()))
except StopIteration:
subnet_gen = netaddr.iter_iprange(start, end, step=step)
yield (i, str(subnet_gen.next()))
i += 1
def test_ip_range():
ip_list = list(iter_iprange('192.0.2.1', '192.0.2.14'))
assert len(ip_list) == 14
assert ip_list == [
IPAddress('192.0.2.1'),
IPAddress('192.0.2.2'),
IPAddress('192.0.2.3'),
IPAddress('192.0.2.4'),
IPAddress('192.0.2.5'),
IPAddress('192.0.2.6'),
IPAddress('192.0.2.7'),
IPAddress('192.0.2.8'),
IPAddress('192.0.2.9'),
IPAddress('192.0.2.10'),
IPAddress('192.0.2.11'),
IPAddress('192.0.2.12'),
IPAddress('192.0.2.13'),
IPAddress('192.0.2.14'),
]
assert cidr_merge(ip_list) == [
IPNetwork('192.0.2.1/32'),
IPNetwork('192.0.2.2/31'),
IPNetwork('192.0.2.4/30'),
IPNetwork('192.0.2.8/30'),
IPNetwork('192.0.2.12/31'),
IPNetwork('192.0.2.14/32'),
]
test_ip_ranges.py 文件源码
项目:aCloudGuru-Event-Driven-Security
作者: mikegchambers
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def test_ip_range():
ip_list = list(iter_iprange('192.0.2.1', '192.0.2.14'))
assert len(ip_list) == 14
assert ip_list == [
IPAddress('192.0.2.1'),
IPAddress('192.0.2.2'),
IPAddress('192.0.2.3'),
IPAddress('192.0.2.4'),
IPAddress('192.0.2.5'),
IPAddress('192.0.2.6'),
IPAddress('192.0.2.7'),
IPAddress('192.0.2.8'),
IPAddress('192.0.2.9'),
IPAddress('192.0.2.10'),
IPAddress('192.0.2.11'),
IPAddress('192.0.2.12'),
IPAddress('192.0.2.13'),
IPAddress('192.0.2.14'),
]
assert cidr_merge(ip_list) == [
IPNetwork('192.0.2.1/32'),
IPNetwork('192.0.2.2/31'),
IPNetwork('192.0.2.4/30'),
IPNetwork('192.0.2.8/30'),
IPNetwork('192.0.2.12/31'),
IPNetwork('192.0.2.14/32'),
]
def valid_hosts(formcls, field):
"""Validate a list of IPs (Ipv4) or hostnames using python stdlib.
This is more robust than the WTForm version as it also considers hostnames.
Comma separated values:
e.g. '10.7.223.101,10.7.12.0'
Space separated values:
e.g. '10.223.101 10.7.223.102'
Ranges:
e.g. '10.7.223.200-10.7.224.10'
Hostnames:
e.g. foo.x.y.com, baz.bar.z.com
:param formcls (object): The form class.
:param field (str): The list of ips.
"""
ip_re = re.compile(r'[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}')
data = field.data
if ',' in data:
ips = [ip for ip in data.split(',') if ip]
elif ' ' in data:
ips = [ip for ip in data.split(' ') if ip]
elif '-' in data and re.match(ip_re, data):
try:
start, end = data.split('-')
ips = iter_iprange(start, end)
ips = [str(ip) for ip in list(ips)]
except ValueError:
raise ValueError(
'Invalid range specified. Format should be: '
'XXX.XXX.XXX.XXX-XXX.XXX.XXX.XXX '
'(e.g. 10.7.223.200-10.7.224.10)')
except AddrFormatError as e:
raise ValueError(e)
else:
# Just use the single ip
ips = [data]
# If any fails conversion, it is invalid.
for ip in ips:
# Skip hostnames
if not is_ip(ip):
if not is_hostname(ip):
raise ValueError('Invalid hostname: "{}"'.format(ip))
else:
continue
try:
socket.inet_aton(ip)
except socket.error:
raise ValueError('Invalid IP: {}'.format(ip))
def _create_lports(self, lswitch, lport_create_args = [], lport_amount=1):
LOG.info("create %d lports on lswitch %s" % \
(lport_amount, lswitch["name"]))
self.RESOURCE_NAME_FORMAT = "lport_XXXXXX_XXXXXX"
batch = lport_create_args.get("batch", lport_amount)
LOG.info("Create lports method: %s" % self.install_method)
install_method = self.install_method
network_cidr = lswitch.get("cidr", None)
ip_addrs = None
if network_cidr:
end_ip = network_cidr.ip + lport_amount
if not end_ip in network_cidr:
message = _("Network %s's size is not big enough for %d lports.")
raise exceptions.InvalidConfigException(
message % (network_cidr, lport_amount))
ip_addrs = netaddr.iter_iprange(network_cidr.ip, network_cidr.last)
ovn_nbctl = self.controller_client("ovn-nbctl")
ovn_nbctl.set_sandbox("controller-sandbox", install_method)
ovn_nbctl.enable_batch_mode()
base_mac = [i[:2] for i in self.task["uuid"].split('-')]
base_mac[0] = str(hex(int(base_mac[0], 16) & 254))
base_mac[3:] = ['00']*3
flush_count = batch
lports = []
for i in range(lport_amount):
name = self.generate_random_name()
lport = ovn_nbctl.lswitch_port_add(lswitch["name"], name)
ip = str(ip_addrs.next()) if ip_addrs else ""
mac = utils.get_random_mac(base_mac)
ovn_nbctl.lport_set_addresses(name, [mac, ip])
ovn_nbctl.lport_set_port_security(name, mac)
lports.append(lport)
flush_count -= 1
if flush_count < 1:
ovn_nbctl.flush()
flush_count = batch
ovn_nbctl.flush() # ensure all commands be run
ovn_nbctl.enable_batch_mode(False)
return lports
def main():
module = AnsibleModule(
argument_spec=dict(
start_cidr=dict(required=True),
num_emulation_hosts=dict(required=False, default="null"),
num_ip=dict(required=True)
),
supports_check_mode=True
)
start_cidr = module.params['start_cidr']
num_emulation_hosts = module.params['num_emulation_hosts']
num_ip = module.params['num_ip']
sandbox_cidr = netaddr.IPNetwork(start_cidr)
sandbox_hosts = netaddr.iter_iprange(sandbox_cidr.ip, sandbox_cidr.last)
ip_data = t_ip_data()
chassis_per_host = int(num_ip) / int(num_emulation_hosts)
overflow = 0
for i in range(0, int(num_ip)):
'''
cidr = start_cidr_ip.split('.')[0] + "." + \
start_cidr_ip.split('.')[1] + "." + \
start_cidr_ip.split('.')[2] + "." + \
str(int(start_cidr_ip.split('.')[3]) + i)
'''
# ip_data.index.append(i % int(num_emulation_hosts))
index = i / chassis_per_host
if (index >= int(num_emulation_hosts)):
index = int(num_emulation_hosts) - 1
overflow += 1
ip_data.index.append(index)
ip_data.ip_list.append(str(sandbox_hosts.next()))
farm_data = t_farm_data()
num_sandbox = 0
sandbox_hosts = netaddr.iter_iprange(sandbox_cidr.ip, sandbox_cidr.last)
for i in range(0, int(num_emulation_hosts)):
farm_data.farm_index.append(i)
num_sandbox = chassis_per_host
if (i == int(num_emulation_hosts) - 1):
num_sandbox = chassis_per_host + overflow
farm_data.num_sandbox_farm.append(num_sandbox)
farm_data.start_cidr_farm.append(str(sandbox_hosts.next()))
for i in range (0, num_sandbox - 1):
sandbox_hosts.next()
module.exit_json(changed=True,ip_index=ip_data.index, \
ip_index_list=str(ip_data.ip_list), \
prefixlen=str(sandbox_cidr.prefixlen),
farm_index=farm_data.farm_index,
num_sandbox_farm=farm_data.num_sandbox_farm,
start_cidr_farm=farm_data.start_cidr_farm)
# import module snippets