def is_address_in_network(network, address):
"""
Determine whether the provided address is within a network range.
:param network (str): CIDR presentation format. For example,
'192.168.1.0/24'.
:param address: An individual IPv4 or IPv6 address without a net
mask or subnet prefix. For example, '192.168.1.1'.
:returns boolean: Flag indicating whether address is in network.
"""
try:
network = netaddr.IPNetwork(network)
except (netaddr.core.AddrFormatError, ValueError):
raise ValueError("Network (%s) is not in CIDR presentation format" %
network)
try:
address = netaddr.IPAddress(address)
except (netaddr.core.AddrFormatError, ValueError):
raise ValueError("Address (%s) is not in correct presentation format" %
address)
if address in network:
return True
else:
return False
python类IPNetwork()的实例源码
def _get_ipv6_network_from_address(address):
"""Get an netaddr.IPNetwork for the given IPv6 address
:param address: a dict as returned by netifaces.ifaddresses
:returns netaddr.IPNetwork: None if the address is a link local or loopback
address
"""
if address['addr'].startswith('fe80') or address['addr'] == "::1":
return None
prefix = address['netmask'].split("/")
if len(prefix) > 1:
netmask = prefix[1]
else:
netmask = address['netmask']
return netaddr.IPNetwork("%s/%s" % (address['addr'],
netmask))
def _get_ipv6_network_from_address(address):
"""Get an netaddr.IPNetwork for the given IPv6 address
:param address: a dict as returned by netifaces.ifaddresses
:returns netaddr.IPNetwork: None if the address is a link local or loopback
address
"""
if address['addr'].startswith('fe80') or address['addr'] == "::1":
return None
prefix = address['netmask'].split("/")
if len(prefix) > 1:
netmask = prefix[1]
else:
netmask = address['netmask']
return netaddr.IPNetwork("%s/%s" % (address['addr'],
netmask))
def contains(self, ip_address):
"Try to match a candidate ip_address and see whether it was in the list of allowed cidrs"
ip_address = netaddr.IPNetwork(ip_address)
result = False
for cidr in self.cidr:
net = netaddr.IPNetwork(cidr)
if ip_address in net:
result = True
break
if self.invert:
return not result
return result
# Test program
def _get_ipv6_network_from_address(address):
"""Get an netaddr.IPNetwork for the given IPv6 address
:param address: a dict as returned by netifaces.ifaddresses
:returns netaddr.IPNetwork: None if the address is a link local or loopback
address
"""
if address['addr'].startswith('fe80') or address['addr'] == "::1":
return None
prefix = address['netmask'].split("/")
if len(prefix) > 1:
netmask = prefix[1]
else:
netmask = address['netmask']
return netaddr.IPNetwork("%s/%s" % (address['addr'],
netmask))
def _get_ipv6_network_from_address(address):
"""Get an netaddr.IPNetwork for the given IPv6 address
:param address: a dict as returned by netifaces.ifaddresses
:returns netaddr.IPNetwork: None if the address is a link local or loopback
address
"""
if address['addr'].startswith('fe80') or address['addr'] == "::1":
return None
prefix = address['netmask'].split("/")
if len(prefix) > 1:
netmask = prefix[1]
else:
netmask = address['netmask']
return netaddr.IPNetwork("%s/%s" % (address['addr'],
netmask))
def _get_ipv6_network_from_address(address):
"""Get an netaddr.IPNetwork for the given IPv6 address
:param address: a dict as returned by netifaces.ifaddresses
:returns netaddr.IPNetwork: None if the address is a link local or loopback
address
"""
if address['addr'].startswith('fe80') or address['addr'] == "::1":
return None
prefix = address['netmask'].split("/")
if len(prefix) > 1:
netmask = prefix[1]
else:
netmask = address['netmask']
return netaddr.IPNetwork("%s/%s" % (address['addr'],
netmask))
def _get_ipv6_network_from_address(address):
"""Get an netaddr.IPNetwork for the given IPv6 address
:param address: a dict as returned by netifaces.ifaddresses
:returns netaddr.IPNetwork: None if the address is a link local or loopback
address
"""
if address['addr'].startswith('fe80') or address['addr'] == "::1":
return None
prefix = address['netmask'].split("/")
if len(prefix) > 1:
netmask = prefix[1]
else:
netmask = address['netmask']
return netaddr.IPNetwork("%s/%s" % (address['addr'],
netmask))
def _get_ipv6_network_from_address(address):
"""Get an netaddr.IPNetwork for the given IPv6 address
:param address: a dict as returned by netifaces.ifaddresses
:returns netaddr.IPNetwork: None if the address is a link local or loopback
address
"""
if address['addr'].startswith('fe80') or address['addr'] == "::1":
return None
prefix = address['netmask'].split("/")
if len(prefix) > 1:
netmask = prefix[1]
else:
netmask = address['netmask']
return netaddr.IPNetwork("%s/%s" % (address['addr'],
netmask))
def _get_ipv6_network_from_address(address):
"""Get an netaddr.IPNetwork for the given IPv6 address
:param address: a dict as returned by netifaces.ifaddresses
:returns netaddr.IPNetwork: None if the address is a link local or loopback
address
"""
if address['addr'].startswith('fe80') or address['addr'] == "::1":
return None
prefix = address['netmask'].split("/")
if len(prefix) > 1:
netmask = prefix[1]
else:
netmask = address['netmask']
return netaddr.IPNetwork("%s/%s" % (address['addr'],
netmask))
def validate(self, value):
super(IPField, self).validate(value)
if not value and not self.required:
return
try:
if self.mask:
self.ip = netaddr.IPNetwork(value)
else:
self.ip = netaddr.IPAddress(value)
except Exception:
raise ValidationError(self.invalid_format_message)
if not any([self.version & IPv4 > 0 and self.ip.version == 4,
self.version & IPv6 > 0 and self.ip.version == 6]):
raise ValidationError(self.invalid_version_message)
if self.mask:
if self.ip.version == 4 and \
not self.min_mask <= self.ip.prefixlen <= self.max_v4_mask:
raise ValidationError(self.invalid_mask_message)
if self.ip.version == 6 and \
not self.min_mask <= self.ip.prefixlen <= self.max_v6_mask:
raise ValidationError(self.invalid_mask_message)
def is_valid_cidr(address):
"""Check if the provided ipv4 or ipv6 address is a valid CIDR address."""
try:
# Validate the correct CIDR Address
netaddr.IPNetwork(address)
except netaddr.core.AddrFormatError:
return False
except UnboundLocalError:
# NOTE(MotoKen): work around bug in netaddr 0.7.5 (see detail in
# https://github.com/drkjam/netaddr/issues/2)
return False
# Prior validation partially verify /xx part
# Verify it here
ip_segment = address.split('/')
if (len(ip_segment) <= 1 or ip_segment[1] == ''):
return False
return True
test_l2_gateway_connection.py 文件源码
项目:vmware-nsx-tempest-plugin
作者: openstack
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def resource_setup(cls):
super(L2GatewayConnectionTest, cls).resource_setup()
# create primary tenant's VLAN network
_subnet = cls.getattr_or_skip_test("vlan_subnet_ipv4_dict")
for _x in ('mask_bits',):
if _x in _subnet:
_subnet[_x] = int(_subnet[_x])
# cidr must be presented & in IPNetwork structure
_subnet['cidr'] = netaddr.IPNetwork(_subnet['cidr'])
_start = _subnet.pop('start', None)
_end = _subnet.pop('end', None)
if _start and _end:
_subnet['allocation_pools'] = [{'start': _start, 'end': _end}]
cls.network = cls.create_network()
# baseAdminNetworkTest does not derive ip_version, mask_bits from cidr
_subnet['ip_version'] = 4
if 'mask_bits' not in _subnet:
_subnet['mask_bits'] = _subnet['cidr'].prefixlen
cls.subnet = cls.create_subnet(cls.network, **_subnet)
def _create_subnet_with_last_subnet_block(cls, network, ip_version=4):
"""Derive last subnet CIDR block from tenant CIDR and
create the subnet with that derived CIDR
"""
if ip_version == 4:
cidr = netaddr.IPNetwork(CONF.network.project_network_cidr)
mask_bits = CONF.network.project_network_mask_bits
elif ip_version == 6:
cidr = netaddr.IPNetwork(CONF.network.project_network_v6_cidr)
mask_bits = CONF.network.project_network_v6_mask_bits
subnet_cidr = list(cidr.subnet(mask_bits))[-1]
gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
body = cls.create_subnet(network, gateway=gateway_ip,
cidr=subnet_cidr, mask_bits=mask_bits)
return body['subnet']
def resource_setup(cls):
"""
Setting up the resources for the test.
"""
super(L2GatewayScenarioTest, cls).resource_setup()
# Create subnet on the network just created.
cls.SUBNET_1_NETWORK_CIDR = CONF.l2gw.subnet_1_cidr
# VLAN id used in setups
cls.VLAN_1 = CONF.l2gw.vlan_1
cls.VLAN_2 = CONF.l2gw.vlan_2
# IPs of predeployed vms.
cls.VM_ON_VDS_TZ1_VLAN16_IP = CONF.l2gw.vm_on_vds_tz1_vlan16_ip
cls.VM1_ON_SWITCH_VLAN16 = CONF.l2gw.vm_on_switch_vlan16
cls.VM1_ON_VDS_TZ2_VLAN16_IP = CONF.l2gw.vm_on_vds_tz2_vlan16_ip
cls.VM1_ON_VDS_TZ2_VLAN17_IP = CONF.l2gw.vm_on_vds_tz2_vlan17_ip
cls.SUBNET_1_MASK = cls.SUBNET_1_NETWORK_CIDR.split("/")[1]
cls.CIDR = netaddr.IPNetwork(cls.SUBNET_1_NETWORK_CIDR)
def deploy_l2gateway_topology(self):
router_l2gateway = self.create_topology_router("router_l2gateway")
# L2gateway network with router
network_l2gateway = self.create_topology_network("network_l2gateway")
# cidr must be presented & in IPNetwork structure.
self.CIDR = netaddr.IPNetwork(self.SUBNET_1_NETWORK_CIDR)
self.create_topology_subnet(
"subnet1_l2gateway", network_l2gateway, cidr=self.CIDR,
router_id=router_l2gateway["id"],
mask_bits=int(self.SUBNET_1_MASK))
secgroup = self.create_topology_security_group()
secgroups = [{'name': secgroup['name']}]
self.create_topology_instance(
"server1_l2gateway", [network_l2gateway],
security_groups=secgroups)
self.create_topology_instance(
"server2_l2gateway", [network_l2gateway],
security_groups=secgroups)
def setup_link(self, interface, cidr):
"""Setup a link.
ip addr add dev interface
ip link set dev interface up
"""
# clear old ipaddr in interface
cmd = ['ip', 'addr', 'flush', 'dev', interface]
agent_utils.execute(cmd, root=True)
ip = IPNetwork(cidr)
cmd = ['ip', 'addr', 'add', cidr, 'broadcast',
str(ip.broadcast), 'dev', interface]
stdcode, stdout = agent_utils.execute(cmd, root=True)
if stdcode == 0:
cmd = ['ip', 'link', 'set', 'dev', interface, 'up']
stdcode, stdout = agent_utils.execute(cmd, root=True)
if stdcode == 0:
return agent_utils.make_response(code=stdcode)
# execute failed.
message = stdout.pop(0)
return agent_utils.make_response(code=stdcode, message=message)
def obj_to_primitive(obj):
"""Recursively turn an object into a python primitive.
A ZunObject becomes a dict, and anything that implements ObjectListBase
becomes a list.
"""
if isinstance(obj, ObjectListBase):
return [obj_to_primitive(x) for x in obj]
elif isinstance(obj, ZunObject):
result = {}
for key in obj.obj_fields:
if obj.obj_attr_is_set(key) or key in obj.obj_extra_fields:
result[key] = obj_to_primitive(getattr(obj, key))
return result
elif isinstance(obj, netaddr.IPAddress):
return str(obj)
elif isinstance(obj, netaddr.IPNetwork):
return str(obj)
else:
return obj
def serialize(self, nested=False):
"""Serializes an object
Pulls all the attributes in an object and creates a dict that
can be turned into the json that netbox is expecting.
If an attribute's value is a ``Record`` or ``IPRecord`` type
it's replaced with the ``id`` field of that object.
:returns: dict of values the NetBox API is expecting.
"""
if nested:
return _get_return(self)
ret = {}
for i in dict(self):
current_val = getattr(self, i)
if isinstance(current_val, Record):
current_val = getattr(current_val, 'serialize')(nested=True)
if isinstance(current_val, netaddr.ip.IPNetwork):
current_val = str(current_val)
ret.update({i: current_val})
return ret
def _get_ipv6_network_from_address(address):
"""Get an netaddr.IPNetwork for the given IPv6 address
:param address: a dict as returned by netifaces.ifaddresses
:returns netaddr.IPNetwork: None if the address is a link local or loopback
address
"""
if address['addr'].startswith('fe80') or address['addr'] == "::1":
return None
prefix = address['netmask'].split("/")
if len(prefix) > 1:
netmask = prefix[1]
else:
netmask = address['netmask']
return netaddr.IPNetwork("%s/%s" % (address['addr'],
netmask))
def _get_ipv6_network_from_address(address):
"""Get an netaddr.IPNetwork for the given IPv6 address
:param address: a dict as returned by netifaces.ifaddresses
:returns netaddr.IPNetwork: None if the address is a link local or loopback
address
"""
if address['addr'].startswith('fe80') or address['addr'] == "::1":
return None
prefix = address['netmask'].split("/")
if len(prefix) > 1:
netmask = prefix[1]
else:
netmask = address['netmask']
return netaddr.IPNetwork("%s/%s" % (address['addr'],
netmask))
def is_valid_cidr(address):
"""Verify that address represents a valid CIDR address.
:param address: Value to verify
:type address: string
:returns: bool
.. versionadded:: 3.8
"""
try:
# Validate the correct CIDR Address
netaddr.IPNetwork(address)
except (TypeError, netaddr.AddrFormatError):
return False
# Prior validation partially verify /xx part
# Verify it here
ip_segment = address.split('/')
if (len(ip_segment) <= 1 or
ip_segment[1] == ''):
return False
return True
inventory.py 文件源码
项目:cluster-genesis
作者: open-power-ref-design-toolkit
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def populate_network_variables(inventory, inventory_source):
# Add the networks from the inventory source into the host_vars
networks = copy.deepcopy(inventory_source['networks'])
for network in networks.values():
# Add properties network address and the netmask
addr = network.get('addr', None)
if addr:
ip = netaddr.IPNetwork(addr)
if ip.prefixlen != 1:
# We don't put networks in with prefix length == 1 because
# the inventory file uses this to note that while the host
# has an interface connected to this network, that interface
# does not directly get an IP address and the address goes
# on a bridge.
network['network'] = str(ip.network)
network['netmask'] = str(ip.netmask)
inventory['all']['vars']['networks'] = networks
inventory.py 文件源码
项目:cluster-genesis
作者: open-power-ref-design-toolkit
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def populate_network_variables(inventory, inventory_source):
# Add the networks from the inventory source into the host_vars
networks = copy.deepcopy(inventory_source['networks'])
for network in networks.values():
# Add properties network address and the netmask
addr = network.get('addr', None)
if addr:
ip = netaddr.IPNetwork(addr)
if ip.prefixlen != 1:
# We don't put networks in with prefix length == 1 because
# the inventory file uses this to note that while the host
# has an interface connected to this network, that interface
# does not directly get an IP address and the address goes
# on a bridge.
network['network'] = str(ip.network)
network['netmask'] = str(ip.netmask)
inventory['all']['vars']['networks'] = networks
def _get_ipv6_network_from_address(address):
"""Get an netaddr.IPNetwork for the given IPv6 address
:param address: a dict as returned by netifaces.ifaddresses
:returns netaddr.IPNetwork: None if the address is a link local or loopback
address
"""
if address['addr'].startswith('fe80') or address['addr'] == "::1":
return None
prefix = address['netmask'].split("/")
if len(prefix) > 1:
netmask = prefix[1]
else:
netmask = address['netmask']
return netaddr.IPNetwork("%s/%s" % (address['addr'],
netmask))
def get_client_ip(self, xid, dhcp_options):
try:
field_name, req_addr = dhcp_options[2]
if field_name == 'requested_addr':
return 'requested', req_addr
raise ValueError
except ValueError:
for field in dhcp_options:
if (field is tuple) and (field[0] == 'requested_addr'):
return field[1]
if xid in self.dhcp_dic.keys():
client_ip = self.dhcp_dic[xid]
return 'stored', client_ip
net = IPNetwork(self.ip_address + '/24')
return 'generated', str(random.choice(list(net)))
def test_ipnetwork_cidr_merge():
ip_list = (
list(IPNetwork('fe80::/120')) +
[
IPNetwork('192.0.2.0/24'),
IPNetwork('192.0.4.0/25'),
IPNetwork('192.0.4.128/25'),
]
+ list(map(str, IPNetwork('192.0.3.0/24')))
)
assert len(ip_list) == 515
assert cidr_merge(ip_list) == [
IPNetwork('192.0.2.0/23'),
IPNetwork('192.0.4.0/24'),
IPNetwork('fe80::/120'),
]
def test_supernetting():
ip = IPNetwork('192.0.2.114')
supernets = ip.supernet(22)
assert supernets == [
IPNetwork('192.0.0.0/22'),
IPNetwork('192.0.2.0/23'),
IPNetwork('192.0.2.0/24'),
IPNetwork('192.0.2.0/25'),
IPNetwork('192.0.2.64/26'),
IPNetwork('192.0.2.96/27'),
IPNetwork('192.0.2.112/28'),
IPNetwork('192.0.2.112/29'),
IPNetwork('192.0.2.112/30'),
IPNetwork('192.0.2.114/31'),
]
def test_ipnetwork_sort_order():
ip_list = list(IPNetwork('192.0.2.128/28'))
random.shuffle(ip_list)
assert sorted(ip_list) == [
IPAddress('192.0.2.128'),
IPAddress('192.0.2.129'),
IPAddress('192.0.2.130'),
IPAddress('192.0.2.131'),
IPAddress('192.0.2.132'),
IPAddress('192.0.2.133'),
IPAddress('192.0.2.134'),
IPAddress('192.0.2.135'),
IPAddress('192.0.2.136'),
IPAddress('192.0.2.137'),
IPAddress('192.0.2.138'),
IPAddress('192.0.2.139'),
IPAddress('192.0.2.140'),
IPAddress('192.0.2.141'),
IPAddress('192.0.2.142'),
IPAddress('192.0.2.143'),
]
def test_ipaddress_and_ipnetwork_canonical_sort_order_by_version():
ip_list = [
IPAddress('192.0.2.130'),
IPNetwork('192.0.2.128/28'),
IPAddress('::'),
IPNetwork('192.0.3.0/24'),
IPNetwork('192.0.2.0/24'),
IPNetwork('fe80::/64'),
IPNetwork('172.24/12'),
IPAddress('10.0.0.1'),
]
random.shuffle(ip_list)
ip_list.sort()
assert ip_list == [
IPAddress('10.0.0.1'),
IPNetwork('172.24.0.0/12'),
IPNetwork('192.0.2.0/24'),
IPNetwork('192.0.2.128/28'),
IPAddress('192.0.2.130'),
IPNetwork('192.0.3.0/24'),
IPAddress('::'),
IPNetwork('fe80::/64'),
]