def test_inet_adapt(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select %s", [ip.ip_interface('127.0.0.1/24')])
self.assertEquals(cur.fetchone()[0], '127.0.0.1/24')
cur.execute("select %s", [ip.ip_interface('::ffff:102:300/128')])
self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128')
python类ip_interface()的实例源码
def get_ip_interface(self, ipaddr):
return ipaddress.ip_interface(text_type(ipaddr + '/' + self.subnet.netmask))
def ip_interface(self):
return self.ip_interface(self.ip)
def _get_local_netinfo(iface):
ifconfig = netifaces.ifaddresses(iface)[netifaces.AF_INET][0]
ifconfig['gateway'] = netifaces.gateways()['default'][netifaces.AF_INET][0]
ip = ipaddress.ip_interface(u'%s/%s' % (ifconfig['addr'], ifconfig['netmask']))
ifconfig['network'] = str(ip.network.network_address)
ifconfig['iface'] = iface
return ifconfig
def cidr(ipaddr, netmask):
try:
ip = ipaddress.ip_interface(u'%s/%s' % (ipaddr, netmask))
return str(ip).replace('/', ' / ')
except ValueError:
return ipaddr + ' / ' + netmask
def _create_ping_cmd(self, targets, vrf, count, timeout_value, size):
"""
Internal method to create ping command.
"""
cli_cmd = []
try:
for numips in targets:
check_valid_ip = ip_address(unicode(numips))
numips = str(check_valid_ip)
valid_address = ip_interface(unicode(numips))
if valid_address.version == 4:
if vrf != 'default-vrf':
cli = "ping vrf {} {} count {} datagram-size {} timeout {}".format(
vrf, numips, count, size, timeout_value)
else:
cli = "ping {} count {} datagram-size {} timeout {}".format(
numips, count, size, timeout_value)
elif valid_address.version == 6:
if vrf != 'default-vrf':
cli = "ping vrf {} ipv6 {} count {} datagram-size {} timeout {}".format(
vrf, numips, count, size, timeout_value)
else:
cli = "ping ipv6 {} count {} datagram-size {} timeout {}".format(
numips, count, size, timeout_value)
cli_cmd.append(cli)
return cli_cmd
except ValueError:
raise ValueError('Invalid IP: %s', numips)
def test_ip_interface(self):
self.assertFactoryError(ipaddress.ip_interface, "interface")
def testIpFromPacked(self):
address = ipaddress.ip_address
self.assertEqual(self.ipv4_interface._ip,
ipaddress.ip_interface(b'\x01\x02\x03\x04')._ip)
self.assertEqual(address('255.254.253.252'),
address(b'\xff\xfe\xfd\xfc'))
self.assertEqual(self.ipv6_interface.ip,
ipaddress.ip_interface(
b'\x20\x01\x06\x58\x02\x2a\xca\xfe'
b'\x02\x00\x00\x00\x00\x00\x00\x01').ip)
self.assertEqual(address('ffff:2:3:4:ffff::'),
address(b'\xff\xff\x00\x02\x00\x03\x00\x04' +
b'\xff\xff' + b'\x00' * 6))
self.assertEqual(address('::'),
address(b'\x00' * 16))
def testInterfaceComparison(self):
self.assertTrue(ipaddress.ip_interface('1.1.1.1') <=
ipaddress.ip_interface('1.1.1.1'))
self.assertTrue(ipaddress.ip_interface('1.1.1.1') <=
ipaddress.ip_interface('1.1.1.2'))
self.assertTrue(ipaddress.ip_interface('::1') <=
ipaddress.ip_interface('::1'))
self.assertTrue(ipaddress.ip_interface('::1') <=
ipaddress.ip_interface('::2'))
def _get_ip_network(self, gateway_ip, prefixlen):
"""Returns the IP network for the gateway in CIDR form."""
return str(ipaddress.ip_interface(unicode(gateway_ip + "/"
+ str(prefixlen))).network)
def test_ip_interface(self):
self.assertFactoryError(ipaddress.ip_interface, "interface")
def testIpFromPacked(self):
address = ipaddress.ip_address
self.assertEqual(self.ipv4_interface._ip,
ipaddress.ip_interface(b'\x01\x02\x03\x04')._ip)
self.assertEqual(address('255.254.253.252'),
address(b'\xff\xfe\xfd\xfc'))
self.assertEqual(self.ipv6_interface.ip,
ipaddress.ip_interface(
b'\x20\x01\x06\x58\x02\x2a\xca\xfe'
b'\x02\x00\x00\x00\x00\x00\x00\x01').ip)
self.assertEqual(address('ffff:2:3:4:ffff::'),
address(b'\xff\xff\x00\x02\x00\x03\x00\x04' +
b'\xff\xff' + b'\x00' * 6))
self.assertEqual(address('::'),
address(b'\x00' * 16))
def testInterfaceComparison(self):
self.assertTrue(ipaddress.ip_interface('1.1.1.1') <=
ipaddress.ip_interface('1.1.1.1'))
self.assertTrue(ipaddress.ip_interface('1.1.1.1') <=
ipaddress.ip_interface('1.1.1.2'))
self.assertTrue(ipaddress.ip_interface('::1') <=
ipaddress.ip_interface('::1'))
self.assertTrue(ipaddress.ip_interface('::1') <=
ipaddress.ip_interface('::2'))
def testCopyConstructor(self):
addr1 = ipaddress.ip_network('10.1.1.0/24')
addr2 = ipaddress.ip_network(addr1)
addr3 = ipaddress.ip_interface('2001:658:22a:cafe:200::1/64')
addr4 = ipaddress.ip_interface(addr3)
addr5 = ipaddress.IPv4Address('1.1.1.1')
addr6 = ipaddress.IPv6Address('2001:658:22a:cafe:200::1')
self.assertEqual(addr1, addr2)
self.assertEqual(addr3, addr4)
self.assertEqual(addr5, ipaddress.IPv4Address(addr5))
self.assertEqual(addr6, ipaddress.IPv6Address(addr6))
def generatePeeringAddresses():
network = ip_network(u'10.0.%s.0/24' % AutonomousSystem.psIdx)
AutonomousSystem.psIdx += 1
return ip_interface('%s/%s' % (network[1], network.prefixlen)), \
ip_interface('%s/%s' % (network[2], network.prefixlen))
def getLastAddress(network):
return ip_interface(network.network_address + network.num_addresses - 2)
def getIthAddress(network, i):
return ip_interface('%s/%s' % (network[i], network.prefixlen))
def test_commit_query(self):
q = Query({'hostname': 'test1'})
s = q.get()
s['os'] = 'wheezy'
s['intern_ip'] = '10.16.2.1'
q.commit()
s = Query({'hostname': 'test1'}).get()
self.assertEquals(s['os'], 'wheezy')
self.assertEquals(s['intern_ip'], ip_interface('10.16.2.1'))
def clean(self, value):
ip_string = super(IPv4Field, self).clean(value)
if ip_string:
return ip_interface(ip_string)
return None
def clean(self, value):
ip_string = super(IPv6Field, self).clean(value)
if ip_string:
try:
return ip_interface(ip_string)
except ValueError:
raise ValidationError('Not a valid IPv6 Address')
return None