def _validate_cidr(self, rule):
"""Validate the cidr block in a rule.
Returns:
True: Upon successful completion.
Raises:
SpinnakerSecurityGroupCreationFailed: CIDR definition is invalid or
the network range is too wide.
"""
try:
network = ipaddress.IPv4Network(rule['app'])
except (ipaddress.NetmaskValueError, ValueError) as error:
raise SpinnakerSecurityGroupCreationFailed(error)
self.log.debug('Validating CIDR: %s', network.exploded)
return True
python类IPv4Network()的实例源码
def validate_ip_network_address_prefix(address, prefix, strict=True):
'''
If address/prefix is a valid IP network when parsed according to strict,
return it as an ipnetwork object.
Otherwise, return None.
'''
ip_address = validate_ip_address(address)
if ip_address is None or ip_address.version not in [4,6]:
return None
try:
if ip_address.version == 4:
return ipaddress.IPv4Network((ip_address, prefix), strict=strict)
else:
return ipaddress.IPv6Network((ip_address, prefix), strict=strict)
except ValueError:
return None
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def test_cidr_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::cidr")
self.assertTrue(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.0/24'::cidr")
obj = cur.fetchone()[0]
self.assertTrue(isinstance(obj, ip.IPv4Network), repr(obj))
self.assertEqual(obj, ip.ip_network('127.0.0.0/24'))
cur.execute("select '::ffff:102:300/128'::cidr")
obj = cur.fetchone()[0]
self.assertTrue(isinstance(obj, ip.IPv6Network), repr(obj))
self.assertEqual(obj, ip.ip_network('::ffff:102:300/128'))
def test_cidr_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::cidr")
self.assert_(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.0/24'::cidr")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv4Network), repr(obj))
self.assertEquals(obj, ip.ip_network('127.0.0.0/24'))
cur.execute("select '::ffff:102:300/128'::cidr")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv6Network), repr(obj))
self.assertEquals(obj, ip.ip_network('::ffff:102:300/128'))
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def test_covering_cidr(ips):
"""
covering_cidr() gets the minimal CIDR that covers given IPs.
In particular, that means any subnets should *not* cover all given IPs.
"""
cidr = telepresence.vpn.covering_cidr(ips)
assert isinstance(cidr, str)
cidr = ipaddress.IPv4Network(cidr)
assert cidr.prefixlen <= 24
# All IPs in given CIDR:
ips = [ipaddress.IPv4Address(i) for i in ips]
assert all([ip in cidr for ip in ips])
# Subnets do not contain all IPs if we're not in minimum 24 bit CIDR:
if cidr.prefixlen < 24:
for subnet in cidr.subnets():
assert not all([ip in subnet for ip in ips])
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def recreate_routable_ip_pool(self):
self.ip_pools.clear()
# First we parse get the default interface:
# 8.8.8.8 via 192.168.115.254 dev ens3 src 192.168.113.245
#
# Then we extract IP/Preffix from this:
# 2: ens3: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast
# link/ether 02:00:c0:a8:71:f5 brd ff:ff:ff:ff:ff:ff
# inet 192.168.113.245/22 brd 192.168.115.255 scope global ens3
cmd = """
MAIN_INTERFACE=$(ip route get 8.8.8.8 | egrep 'dev\s+.+?\s+src' -o | awk '{print $2}'); # noqa
ip addr show dev $MAIN_INTERFACE | awk 'NR==3 { print $2 }'
"""
_, main_ip, _ = self.ssh_exec('master', cmd)
ip_pool = str(IPv4Network(unicode(main_ip), strict=False))
self.ip_pools.add(ip_pool, includes=self.env['KD_ONE_PUB_IPS'])
def run(self, subnet, exclude, include, node=None):
if exclude and include:
raise InvalidCommand('Can\'t specify both -e and -i')
if include:
to_include = ippool.IpAddrPool().parse_autoblock(include)
net = IPv4Network(unicode(subnet))
hosts = {str(i) for i in net.hosts()}
# Do not include network and broadcast IP address
# TODO: Fix according to AC-4044
hosts.add(str(net.network_address))
hosts.add(str(net.broadcast_address))
exclude = ','.join(hosts - to_include)
ippool.IpAddrPool().create({
'network': subnet.decode(),
'autoblock': exclude,
'node': node
})
def test_cidr_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::cidr")
self.assertTrue(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.0/24'::cidr")
obj = cur.fetchone()[0]
self.assertTrue(isinstance(obj, ip.IPv4Network), repr(obj))
self.assertEqual(obj, ip.ip_network('127.0.0.0/24'))
cur.execute("select '::ffff:102:300/128'::cidr")
obj = cur.fetchone()[0]
self.assertTrue(isinstance(obj, ip.IPv6Network), repr(obj))
self.assertEqual(obj, ip.ip_network('::ffff:102:300/128'))
def test_cidr_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::cidr")
self.assert_(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.0/24'::cidr")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv4Network), repr(obj))
self.assertEquals(obj, ip.ip_network('127.0.0.0/24'))
cur.execute("select '::ffff:102:300/128'::cidr")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv6Network), repr(obj))
self.assertEquals(obj, ip.ip_network('::ffff:102:300/128'))
def _discover(self, network):
"""
Runs the discovery scan to find Mebo on your LAN
:param network: The IPv4 network netmask as a CIDR string. Example: 192.168.1.0/24
:returns: The IP address of the Mebo found on the LAN
:raises: `MeboDiscoveryError` if broadcast discovery times out or the API probe produces a 40X or 50x status code
"""
try:
print('Looking for Mebo...')
self._network = IPv4Network(network)
addr = self._network.broadcast_address.compressed
broadcast = self._get_broadcast(addr)
api_response = self._probe(broadcast.ip)
api_response.raise_for_status()
print('Mebo found at {}'.format(broadcast.ip))
return broadcast.ip
except (socket.timeout, HTTPError):
raise MeboDiscoveryError(('Unable to locate Mebo on the network.\n'
'\tMake sure it is powered on and connected to LAN.\n'
'\tIt may be necessary to power cycle the Mebo.'))
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def ip_address_get(value):
'''
Get an IP address from a string. Raises a GetError if the string
is not a valid IP address. Supports both IPv4 and IPv6.
'''
if isinstance(value, (ipaddress.IPv4Network, ipaddress.IPv6Network)):
raise GetError(
"%s not allowed to be converted to IP address, use class attributes" %
type(value).__name__,
)
try:
return ipaddress.ip_address(value)
except ValueError:
raise GetError("value '%s' is not a valid IPv4/IPv6 address" % value)
def get_less_used_network(num_of_requested_ip, global_variable, config_obj):
"""
Get the network with more IP available.
:return: Network
"""
# identify less used network
while global_variable.free_ip_lock is True:
time.sleep(1)
free_ip = global_variable.free_ips
less_used_network = sorted(free_ip, key=lambda k: len(free_ip[k]), reverse=True)[0]
# network must have enough IPs
if len(global_variable.free_ips[less_used_network]) < num_of_requested_ip + int(
config_section_map(config_obj, 'vcloud')['min_spare_ip']):
return None
ip_list = global_variable.free_ips[less_used_network][:num_of_requested_ip]
pvdc_external_networks = global_variable.vcs.get_network()
from ipaddress import IPv4Address, IPv4Network
external_network = filter(lambda x: IPv4Address(x.gateway) in IPv4Network(less_used_network), pvdc_external_networks)
return {'object': external_network[0],
'iplist': ip_list}
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
"""Initialise new IPPrefix instance."""
try:
obj = ipaddress.ip_address(value)
except Exception:
try:
obj = ipaddress.ip_network(value, strict=False)
except Exception:
raise
if type(obj) in [ipaddress.IPv4Address, ipaddress.IPv6Address]:
self.prefix = None
self.type = self.HOST
elif type(obj) in [ipaddress.IPv4Network, ipaddress.IPv6Network]:
self.prefix = obj.network_address
self.type = self.PREFIX
self.version = obj.version
self.txt = obj.compressed
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def is_cidr_in_cidr(small_cidr, big_cidr):
"""
Return True if the small CIDR is contained in the big CIDR.
"""
# The default route (0.0.0.0/0) is handled differently, since every route
# would always be contained in there. Instead, only a small CIDR of
# "0.0.0.0/0" can match against it. Other small CIDRs will always result in
# 'False' (not contained).
if small_cidr == "0.0.0.0/0":
return big_cidr == "0.0.0.0/0"
else:
if big_cidr == "0.0.0.0/0":
return False
s = ipaddress.IPv4Network(unicode(small_cidr))
b = ipaddress.IPv4Network(unicode(big_cidr))
return s.subnet_of(b)
def __init__(self, value):
if not isinstance(
value,
(
ipaddress.IPv4Address,
ipaddress.IPv6Address,
ipaddress.IPv4Network,
ipaddress.IPv6Network
)
):
raise TypeError(
"value must be an instance of ipaddress.IPv4Address, "
"ipaddress.IPv6Address, ipaddress.IPv4Network, or "
"ipaddress.IPv6Network"
)
self._value = value
def parseIRLorigin(self, fileName, ip2as=True, as2ip=False):
re_origin = re.compile('(\d+\.\d+\.\d+\.\d+/\d+)\t(\d+)', re.UNICODE)
if as2ip:
self.as2ip = dict()
if ip2as:
self.ip2as = SubnetTree.SubnetTree()
F_origin = open(fileName, 'r')
for line in iter(F_origin):
match = re_origin.match(line)
if match is not None:
asNum = int(match.group(2))
if ip2as:
self.ip2as[match.group(1)] = asNum
if as2ip:
newNet = IPv4Network(match.group(1))
if newNet.prefixlen > self.smallSubnetPrefix:
continue
if asNum in self.netGraph:
if asNum not in self.as2ip:
self.as2ip[asNum] = [newNet]
else:
self.as2ip[asNum].append(newNet)
return None