def test_inet_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::inet")
self.assertTrue(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.1/24'::inet")
obj = cur.fetchone()[0]
self.assertTrue(isinstance(obj, ip.IPv4Interface), repr(obj))
self.assertEqual(obj, ip.ip_interface('127.0.0.1/24'))
cur.execute("select '::ffff:102:300/128'::inet")
obj = cur.fetchone()[0]
self.assertTrue(isinstance(obj, ip.IPv6Interface), repr(obj))
self.assertEqual(obj, ip.ip_interface('::ffff:102:300/128'))
python类ip_interface()的实例源码
def test_inet_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::inet")
self.assertTrue(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.1/24'::inet")
obj = cur.fetchone()[0]
self.assertTrue(isinstance(obj, ip.IPv4Interface), repr(obj))
self.assertEqual(obj, ip.ip_interface('127.0.0.1/24'))
cur.execute("select '::ffff:102:300/128'::inet")
obj = cur.fetchone()[0]
self.assertTrue(isinstance(obj, ip.IPv6Interface), repr(obj))
self.assertEqual(obj, ip.ip_interface('::ffff:102:300/128'))
def test_inet_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::inet")
self.assert_(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.1/24'::inet")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv4Interface), repr(obj))
self.assertEquals(obj, ip.ip_interface('127.0.0.1/24'))
cur.execute("select '::ffff:102:300/128'::inet")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv6Interface), repr(obj))
self.assertEquals(obj, ip.ip_interface('::ffff:102:300/128'))
def cidr_validator(value, return_ip_interface=False):
"""Validate IPv4 + optional subnet in CIDR notation"""
try:
if '/' in value:
ipaddr, netmask = value.split('/')
netmask = int(netmask)
else:
ipaddr, netmask = value, 32
if not validators.ipv4_re.match(ipaddr) or not 1 <= netmask <= 32:
raise ValueError
ipi = ipaddress.ip_interface(six.text_type(value))
if ipi.is_reserved:
raise ValueError
except ValueError:
raise ValidationError(_('Enter a valid IPv4 address or IPv4 network.'))
if return_ip_interface:
return ipi
def _create_ping_cmd(self, targets, vrf, count, timeout_value, size):
"""
Internal method to create ping command.
"""
cli_cmd = []
try:
for numips in targets:
check_valid_ip = ip_address(unicode(numips))
numips = str(check_valid_ip)
valid_address = ip_interface(unicode(numips))
if valid_address.version == 4:
cli = "ping {} vrf {} count {} datagram-size {} timeout {}".format(
numips, vrf, count, size, timeout_value)
elif valid_address.version == 6:
cli = "ping ipv6 {} vrf {} count {} datagram-size {} timeout {}".format(
numips, vrf, count, size, timeout_value)
cli_cmd.append(cli)
return cli_cmd
except ValueError:
raise ValueError('Invalid IP: %s', numips)
def testHash(self):
self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')),
hash(ipaddress.ip_interface('10.1.1.0/24')))
self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')),
hash(ipaddress.ip_network('10.1.1.0/24')))
self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')),
hash(ipaddress.ip_address('10.1.1.0')))
# i70
self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')),
hash(ipaddress.ip_address(
int(ipaddress.ip_address('1.2.3.4')._ip))))
ip1 = ipaddress.ip_address('10.1.1.0')
ip2 = ipaddress.ip_address('1::')
dummy = {}
dummy[self.ipv4_address] = None
dummy[self.ipv6_address] = None
dummy[ip1] = None
dummy[ip2] = None
self.assertTrue(self.ipv4_address in dummy)
self.assertTrue(ip2 in dummy)
def testHash(self):
self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')),
hash(ipaddress.ip_interface('10.1.1.0/24')))
self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')),
hash(ipaddress.ip_network('10.1.1.0/24')))
self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')),
hash(ipaddress.ip_address('10.1.1.0')))
# i70
self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')),
hash(ipaddress.ip_address(
int(ipaddress.ip_address('1.2.3.4')._ip))))
ip1 = ipaddress.ip_address('10.1.1.0')
ip2 = ipaddress.ip_address('1::')
dummy = {}
dummy[self.ipv4_address] = None
dummy[self.ipv6_address] = None
dummy[ip1] = None
dummy[ip2] = None
self.assertIn(self.ipv4_address, dummy)
self.assertIn(ip2, dummy)
def _get_host_ip_addr(attributes, networks):
if 'intern_ip' not in attributes:
if not networks:
raise CreateError(
'"intern_ip" is not given, and no networks could be found.'
)
intern_ip = _choose_ip_addr(networks)
if intern_ip is None:
raise CreateError(
'No IP address could be selected from the given networks.'
)
return intern_ip
try:
intern_ip = ip_interface(attributes['intern_ip'])
except ValueError as error:
raise CreateError(str(error))
_check_in_networks(networks, intern_ip.network)
return intern_ip.ip
def edit(request):
if 'object_id' in request.GET:
server = Query({'object_id': request.GET['object_id']}).get()
else:
servertype = Servertype.objects.get(pk=request.POST['attr_servertype'])
project = Project.objects.get(pk=request.POST['attr_project'])
hostname = request.POST['attr_hostname']
if servertype.ip_addr_type == 'null':
intern_ip = None
elif servertype.ip_addr_type == 'network':
intern_ip = ip_network(request.POST['attr_intern_ip'])
else:
intern_ip = ip_interface(request.POST['attr_intern_ip'])
server = ServerObject.new(servertype, project, hostname, intern_ip)
return _edit(request, server, True)
def testHash(self):
self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')),
hash(ipaddress.ip_interface('10.1.1.0/24')))
self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')),
hash(ipaddress.ip_network('10.1.1.0/24')))
self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')),
hash(ipaddress.ip_address('10.1.1.0')))
# i70
self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')),
hash(ipaddress.ip_address(
int(ipaddress.ip_address('1.2.3.4')._ip))))
ip1 = ipaddress.ip_address('10.1.1.0')
ip2 = ipaddress.ip_address('1::')
dummy = {}
dummy[self.ipv4_address] = None
dummy[self.ipv6_address] = None
dummy[ip1] = None
dummy[ip2] = None
self.assertIn(self.ipv4_address, dummy)
self.assertIn(ip2, dummy)
def test_inet_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::inet")
self.assert_(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.1/24'::inet")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv4Interface), repr(obj))
self.assertEquals(obj, ip.ip_interface('127.0.0.1/24'))
cur.execute("select '::ffff:102:300/128'::inet")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv6Interface), repr(obj))
self.assertEquals(obj, ip.ip_interface('::ffff:102:300/128'))
def _configure_interface_address(self, interface, address, default_route=None):
net = ipaddress.ip_interface(address)
if default_route:
try:
next_hop = ipaddress.ip_address(default_route)
except ValueError:
self.logger.error("next-hop address {} could not be parsed".format(default_route))
sys.exit(1)
if default_route and next_hop not in net.network:
self.logger.error("next-hop address {} not in network {}".format(next_hop, net))
sys.exit(1)
subprocess.check_call(["ip", "-{}".format(net.version), "address", "add", str(net.ip) + "/" + str(net.network.prefixlen), "dev", interface])
if next_hop:
try:
subprocess.check_call(["ip", "-{}".format(net.version), "route", "del", "default"])
except:
pass
subprocess.check_call(["ip", "-{}".format(net.version), "route", "add", "default", "dev", interface, "via", str(next_hop)])
def _network(self, address, netmask):
"""
Return CIDR network
"""
return ipaddress.ip_interface(u'{}/{}'.format(address, netmask)).network
def cast_interface(s, cur=None):
if s is None:
return None
# Py2 version force the use of unicode. meh.
return ipaddress.ip_interface(str(s))
def test_inet_array_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]")
l = cur.fetchone()[0]
self.assertTrue(l[0] is None)
self.assertEqual(l[1], ip.ip_interface('127.0.0.1'))
self.assertEqual(l[2], ip.ip_interface('::ffff:102:300/128'))
self.assertTrue(isinstance(l[1], ip.IPv4Interface), l)
self.assertTrue(isinstance(l[2], ip.IPv6Interface), l)
def test_inet_adapt(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select %s", [ip.ip_interface('127.0.0.1/24')])
self.assertEqual(cur.fetchone()[0], '127.0.0.1/24')
cur.execute("select %s", [ip.ip_interface('::ffff:102:300/128')])
self.assertEqual(cur.fetchone()[0], '::ffff:102:300/128')
def cast_interface(s, cur=None):
if s is None:
return None
# Py2 version force the use of unicode. meh.
return ipaddress.ip_interface(unicode(s))
def test_inet_array_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]")
l = cur.fetchone()[0]
self.assert_(l[0] is None)
self.assertEquals(l[1], ip.ip_interface('127.0.0.1'))
self.assertEquals(l[2], ip.ip_interface('::ffff:102:300/128'))
self.assert_(isinstance(l[1], ip.IPv4Interface), l)
self.assert_(isinstance(l[2], ip.IPv6Interface), l)
def test_inet_adapt(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select %s", [ip.ip_interface('127.0.0.1/24')])
self.assertEquals(cur.fetchone()[0], '127.0.0.1/24')
cur.execute("select %s", [ip.ip_interface('::ffff:102:300/128')])
self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128')
def _get_subnets_by_interface_cidr(neutron_network_id,
interface_cidr):
iface = ipaddress.ip_interface(six.text_type(interface_cidr))
subnets = _get_subnets_by_attrs(
network_id=neutron_network_id, cidr=six.text_type(iface.network))
if len(subnets) > 2:
raise exceptions.DuplicatedResourceException(
"Multiple Neutron subnets exist for the network_id={0}"
"and cidr={1}"
.format(neutron_network_id, iface.network))
return subnets
def _process_interface_address(port_dict, subnets_dict_by_id,
response_interface):
subnet_id = port_dict['subnet_id']
subnet = subnets_dict_by_id[subnet_id]
iface = ipaddress.ip_interface(six.text_type(subnet['cidr']))
address_key = 'Address' if iface.version == 4 else 'AddressIPv6'
response_interface[address_key] = six.text_type(iface)
def _get_fixed_ips_by_interface_cidr(subnets, interface_cidrv4,
interface_cidrv6, fixed_ips):
for subnet in subnets:
fixed_ip = [('subnet_id=%s' % subnet['id'])]
if interface_cidrv4 or interface_cidrv6:
if subnet['ip_version'] == 4 and interface_cidrv4:
iface = ipaddress.ip_interface(six.text_type(interface_cidrv4))
elif subnet['ip_version'] == 6 and interface_cidrv6:
iface = ipaddress.ip_interface(six.text_type(interface_cidrv6))
if six.text_type(subnet['cidr']) != six.text_type(iface.network):
continue
fixed_ip.append('ip_address=%s' % iface.ip)
fixed_ips.extend(fixed_ip)
def get_node_interface(data, node_ip):
ip = ipaddress.ip_address(unicode(node_ip))
patt = re.compile(r'(?P<iface>\w+)\s+inet\s+(?P<ip>[0-9\/\.]+)')
for line in data.splitlines():
m = patt.search(line)
if m is None:
continue
iface = ipaddress.ip_interface(unicode(m.group('ip')))
if ip == iface.ip:
return m.group('iface')
def iter_addrs(data):
for ipaddr in data['ipaddr']:
yield ipaddress.ip_interface('{}/{}'.format(*ipaddr))
def send(self, msg):
"""
Sends the IMC message to the node, filling in the destination
:param msg: The IMC message to send
:return:
"""
imcudp_services = self.services['imc+udp']
if not imcudp_services:
logger.error('{} does not expose an imc+udp service'.format(self))
return
# Determine which service to send to based on ip/netmask
# Note: this might not account for funky ip routing
networks = [ip.ip_interface(x[1] + '/' + x[2]).network for x in get_interfaces()]
for svc in imcudp_services:
svc_ip = ip.ip_address(svc.ip)
if any([svc_ip in network for network in networks]):
with IMCSenderUDP(svc.ip) as s:
s.send(message=msg, port=svc.port)
return
# If this point is reached no local interfaces has the target system in its netmask
# Could be running on same system with no available interfaces
# Send on loopback
ports = [svc.port for svc in imcudp_services]
with IMCSenderUDP('127.0.0.1') as s:
for port in ports:
s.send(message=msg, port=port)
def cast_interface(s, cur=None):
if s is None:
return None
# Py2 version force the use of unicode. meh.
return ipaddress.ip_interface(str(s))
def test_inet_array_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]")
l = cur.fetchone()[0]
self.assertTrue(l[0] is None)
self.assertEqual(l[1], ip.ip_interface('127.0.0.1'))
self.assertEqual(l[2], ip.ip_interface('::ffff:102:300/128'))
self.assertTrue(isinstance(l[1], ip.IPv4Interface), l)
self.assertTrue(isinstance(l[2], ip.IPv6Interface), l)
def test_inet_adapt(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select %s", [ip.ip_interface('127.0.0.1/24')])
self.assertEqual(cur.fetchone()[0], '127.0.0.1/24')
cur.execute("select %s", [ip.ip_interface('::ffff:102:300/128')])
self.assertEqual(cur.fetchone()[0], '::ffff:102:300/128')
def cast_interface(s, cur=None):
if s is None:
return None
# Py2 version force the use of unicode. meh.
return ipaddress.ip_interface(unicode(s))
def test_inet_array_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]")
l = cur.fetchone()[0]
self.assert_(l[0] is None)
self.assertEquals(l[1], ip.ip_interface('127.0.0.1'))
self.assertEquals(l[2], ip.ip_interface('::ffff:102:300/128'))
self.assert_(isinstance(l[1], ip.IPv4Interface), l)
self.assert_(isinstance(l[2], ip.IPv6Interface), l)