python类ip_interface()的实例源码

test_ipaddress.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_inet_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select null::inet")
        self.assertTrue(cur.fetchone()[0] is None)

        cur.execute("select '127.0.0.1/24'::inet")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv4Interface), repr(obj))
        self.assertEqual(obj, ip.ip_interface('127.0.0.1/24'))

        cur.execute("select '::ffff:102:300/128'::inet")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv6Interface), repr(obj))
        self.assertEqual(obj, ip.ip_interface('::ffff:102:300/128'))
test_ipaddress.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_inet_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select null::inet")
        self.assertTrue(cur.fetchone()[0] is None)

        cur.execute("select '127.0.0.1/24'::inet")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv4Interface), repr(obj))
        self.assertEqual(obj, ip.ip_interface('127.0.0.1/24'))

        cur.execute("select '::ffff:102:300/128'::inet")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv6Interface), repr(obj))
        self.assertEqual(obj, ip.ip_interface('::ffff:102:300/128'))
test_ipaddress.py 文件源码 项目:nmbs-realtime-feed 作者: datamindedbe 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_inet_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select null::inet")
        self.assert_(cur.fetchone()[0] is None)

        cur.execute("select '127.0.0.1/24'::inet")
        obj = cur.fetchone()[0]
        self.assert_(isinstance(obj, ip.IPv4Interface), repr(obj))
        self.assertEquals(obj, ip.ip_interface('127.0.0.1/24'))

        cur.execute("select '::ffff:102:300/128'::inet")
        obj = cur.fetchone()[0]
        self.assert_(isinstance(obj, ip.IPv6Interface), repr(obj))
        self.assertEquals(obj, ip.ip_interface('::ffff:102:300/128'))
validators.py 文件源码 项目:esdc-ce 作者: erigones 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def cidr_validator(value, return_ip_interface=False):
    """Validate IPv4 + optional subnet in CIDR notation"""
    try:
        if '/' in value:
            ipaddr, netmask = value.split('/')
            netmask = int(netmask)
        else:
            ipaddr, netmask = value, 32

        if not validators.ipv4_re.match(ipaddr) or not 1 <= netmask <= 32:
            raise ValueError

        ipi = ipaddress.ip_interface(six.text_type(value))

        if ipi.is_reserved:
            raise ValueError

    except ValueError:
        raise ValidationError(_('Enter a valid IPv4 address or IPv4 network.'))

    if return_ip_interface:
        return ipi
utils.py 文件源码 项目:PySwitchLib 作者: StackStorm 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _create_ping_cmd(self, targets, vrf, count, timeout_value, size):
        """
           Internal method to create ping command.
        """
        cli_cmd = []
        try:
            for numips in targets:
                check_valid_ip = ip_address(unicode(numips))
                numips = str(check_valid_ip)
                valid_address = ip_interface(unicode(numips))

                if valid_address.version == 4:
                    cli = "ping {} vrf {} count {} datagram-size {} timeout {}".format(
                        numips, vrf, count, size, timeout_value)
                elif valid_address.version == 6:
                    cli = "ping ipv6 {} vrf {} count {} datagram-size {} timeout {}".format(
                        numips, vrf, count, size, timeout_value)
                cli_cmd.append(cli)
            return cli_cmd
        except ValueError:
            raise ValueError('Invalid IP: %s', numips)
test_ipaddress.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testHash(self):
        self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')),
                         hash(ipaddress.ip_interface('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')),
                         hash(ipaddress.ip_network('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')),
                         hash(ipaddress.ip_address('10.1.1.0')))
        # i70
        self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')),
                         hash(ipaddress.ip_address(
                    int(ipaddress.ip_address('1.2.3.4')._ip))))
        ip1 = ipaddress.ip_address('10.1.1.0')
        ip2 = ipaddress.ip_address('1::')
        dummy = {}
        dummy[self.ipv4_address] = None
        dummy[self.ipv6_address] = None
        dummy[ip1] = None
        dummy[ip2] = None
        self.assertTrue(self.ipv4_address in dummy)
        self.assertTrue(ip2 in dummy)
test_ipaddress.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testHash(self):
        self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')),
                         hash(ipaddress.ip_interface('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')),
                         hash(ipaddress.ip_network('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')),
                         hash(ipaddress.ip_address('10.1.1.0')))
        # i70
        self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')),
                         hash(ipaddress.ip_address(
                    int(ipaddress.ip_address('1.2.3.4')._ip))))
        ip1 = ipaddress.ip_address('10.1.1.0')
        ip2 = ipaddress.ip_address('1::')
        dummy = {}
        dummy[self.ipv4_address] = None
        dummy[self.ipv6_address] = None
        dummy[ip1] = None
        dummy[ip2] = None
        self.assertIn(self.ipv4_address, dummy)
        self.assertIn(ip2, dummy)
create.py 文件源码 项目:serveradmin 作者: innogames 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _get_host_ip_addr(attributes, networks):
    if 'intern_ip' not in attributes:
        if not networks:
            raise CreateError(
                '"intern_ip" is not given, and no networks could be found.'
            )
        intern_ip = _choose_ip_addr(networks)
        if intern_ip is None:
            raise CreateError(
                'No IP address could be selected from the given networks.'
            )
        return intern_ip

    try:
        intern_ip = ip_interface(attributes['intern_ip'])
    except ValueError as error:
        raise CreateError(str(error))

    _check_in_networks(networks, intern_ip.network)
    return intern_ip.ip
views.py 文件源码 项目:serveradmin 作者: innogames 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def edit(request):
    if 'object_id' in request.GET:
        server = Query({'object_id': request.GET['object_id']}).get()
    else:
        servertype = Servertype.objects.get(pk=request.POST['attr_servertype'])
        project = Project.objects.get(pk=request.POST['attr_project'])
        hostname = request.POST['attr_hostname']
        if servertype.ip_addr_type == 'null':
            intern_ip = None
        elif servertype.ip_addr_type == 'network':
            intern_ip = ip_network(request.POST['attr_intern_ip'])
        else:
            intern_ip = ip_interface(request.POST['attr_intern_ip'])
        server = ServerObject.new(servertype, project, hostname, intern_ip)

    return _edit(request, server, True)
test_ipaddress.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testHash(self):
        self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')),
                         hash(ipaddress.ip_interface('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')),
                         hash(ipaddress.ip_network('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')),
                         hash(ipaddress.ip_address('10.1.1.0')))
        # i70
        self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')),
                         hash(ipaddress.ip_address(
                    int(ipaddress.ip_address('1.2.3.4')._ip))))
        ip1 = ipaddress.ip_address('10.1.1.0')
        ip2 = ipaddress.ip_address('1::')
        dummy = {}
        dummy[self.ipv4_address] = None
        dummy[self.ipv6_address] = None
        dummy[ip1] = None
        dummy[ip2] = None
        self.assertIn(self.ipv4_address, dummy)
        self.assertIn(ip2, dummy)
test_ipaddress.py 文件源码 项目:flask 作者: bobohope 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_inet_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select null::inet")
        self.assert_(cur.fetchone()[0] is None)

        cur.execute("select '127.0.0.1/24'::inet")
        obj = cur.fetchone()[0]
        self.assert_(isinstance(obj, ip.IPv4Interface), repr(obj))
        self.assertEquals(obj, ip.ip_interface('127.0.0.1/24'))

        cur.execute("select '::ffff:102:300/128'::inet")
        obj = cur.fetchone()[0]
        self.assert_(isinstance(obj, ip.IPv6Interface), repr(obj))
        self.assertEquals(obj, ip.ip_interface('::ffff:102:300/128'))
xcon.py 文件源码 项目:vrnetlab 作者: plajjan 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _configure_interface_address(self, interface, address, default_route=None):
        net = ipaddress.ip_interface(address)
        if default_route:
            try:
                next_hop = ipaddress.ip_address(default_route)
            except ValueError:
                self.logger.error("next-hop address {} could not be parsed".format(default_route))
                sys.exit(1)

        if default_route and next_hop not in net.network:
            self.logger.error("next-hop address {} not in network {}".format(next_hop, net))
            sys.exit(1)

        subprocess.check_call(["ip", "-{}".format(net.version), "address", "add", str(net.ip) + "/" + str(net.network.prefixlen), "dev", interface])
        if next_hop:
            try:
                subprocess.check_call(["ip", "-{}".format(net.version), "route", "del", "default"])
            except:
                pass
            subprocess.check_call(["ip", "-{}".format(net.version), "route", "add", "default", "dev", interface, "via", str(next_hop)])
populate.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _network(self, address, netmask):
        """
        Return CIDR network
        """
        return ipaddress.ip_interface(u'{}/{}'.format(address, netmask)).network
_ipaddress.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def cast_interface(s, cur=None):
    if s is None:
        return None
    # Py2 version force the use of unicode. meh.
    return ipaddress.ip_interface(str(s))
test_ipaddress.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_inet_array_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)
        cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]")
        l = cur.fetchone()[0]
        self.assertTrue(l[0] is None)
        self.assertEqual(l[1], ip.ip_interface('127.0.0.1'))
        self.assertEqual(l[2], ip.ip_interface('::ffff:102:300/128'))
        self.assertTrue(isinstance(l[1], ip.IPv4Interface), l)
        self.assertTrue(isinstance(l[2], ip.IPv6Interface), l)
test_ipaddress.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_inet_adapt(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select %s", [ip.ip_interface('127.0.0.1/24')])
        self.assertEqual(cur.fetchone()[0], '127.0.0.1/24')

        cur.execute("select %s", [ip.ip_interface('::ffff:102:300/128')])
        self.assertEqual(cur.fetchone()[0], '::ffff:102:300/128')
_ipaddress.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def cast_interface(s, cur=None):
    if s is None:
        return None
    # Py2 version force the use of unicode. meh.
    return ipaddress.ip_interface(unicode(s))
test_ipaddress.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_inet_array_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)
        cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]")
        l = cur.fetchone()[0]
        self.assert_(l[0] is None)
        self.assertEquals(l[1], ip.ip_interface('127.0.0.1'))
        self.assertEquals(l[2], ip.ip_interface('::ffff:102:300/128'))
        self.assert_(isinstance(l[1], ip.IPv4Interface), l)
        self.assert_(isinstance(l[2], ip.IPv6Interface), l)
test_ipaddress.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_inet_adapt(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select %s", [ip.ip_interface('127.0.0.1/24')])
        self.assertEquals(cur.fetchone()[0], '127.0.0.1/24')

        cur.execute("select %s", [ip.ip_interface('::ffff:102:300/128')])
        self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128')
controllers.py 文件源码 项目:kuryr-libnetwork 作者: openstack 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _get_subnets_by_interface_cidr(neutron_network_id,
                                   interface_cidr):
    iface = ipaddress.ip_interface(six.text_type(interface_cidr))
    subnets = _get_subnets_by_attrs(
        network_id=neutron_network_id, cidr=six.text_type(iface.network))
    if len(subnets) > 2:
        raise exceptions.DuplicatedResourceException(
            "Multiple Neutron subnets exist for the network_id={0}"
            "and cidr={1}"
            .format(neutron_network_id, iface.network))
    return subnets
controllers.py 文件源码 项目:kuryr-libnetwork 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _process_interface_address(port_dict, subnets_dict_by_id,
                               response_interface):
    subnet_id = port_dict['subnet_id']
    subnet = subnets_dict_by_id[subnet_id]
    iface = ipaddress.ip_interface(six.text_type(subnet['cidr']))
    address_key = 'Address' if iface.version == 4 else 'AddressIPv6'
    response_interface[address_key] = six.text_type(iface)
controllers.py 文件源码 项目:kuryr-libnetwork 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _get_fixed_ips_by_interface_cidr(subnets, interface_cidrv4,
                                     interface_cidrv6, fixed_ips):
    for subnet in subnets:
        fixed_ip = [('subnet_id=%s' % subnet['id'])]
        if interface_cidrv4 or interface_cidrv6:
            if subnet['ip_version'] == 4 and interface_cidrv4:
                iface = ipaddress.ip_interface(six.text_type(interface_cidrv4))
            elif subnet['ip_version'] == 6 and interface_cidrv6:
                iface = ipaddress.ip_interface(six.text_type(interface_cidrv6))
            if six.text_type(subnet['cidr']) != six.text_type(iface.network):
                continue
            fixed_ip.append('ip_address=%s' % iface.ip)
        fixed_ips.extend(fixed_ip)
tasks.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_node_interface(data, node_ip):
    ip = ipaddress.ip_address(unicode(node_ip))
    patt = re.compile(r'(?P<iface>\w+)\s+inet\s+(?P<ip>[0-9\/\.]+)')
    for line in data.splitlines():
        m = patt.search(line)
        if m is None:
            continue
        iface = ipaddress.ip_interface(unicode(m.group('ip')))
        if ip == iface.ip:
            return m.group('iface')
utils.py 文件源码 项目:pytestlab 作者: sangoma 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def iter_addrs(data):
        for ipaddr in data['ipaddr']:
            yield ipaddress.ip_interface('{}/{}'.format(*ipaddr))
node.py 文件源码 项目:pyimc 作者: oysstu 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def send(self, msg):
        """
        Sends the IMC message to the node, filling in the destination
        :param msg: The IMC message to send
        :return: 
        """

        imcudp_services = self.services['imc+udp']
        if not imcudp_services:
            logger.error('{} does not expose an imc+udp service'.format(self))
            return

        # Determine which service to send to based on ip/netmask
        # Note: this might not account for funky ip routing
        networks = [ip.ip_interface(x[1] + '/' + x[2]).network for x in get_interfaces()]
        for svc in imcudp_services:
            svc_ip = ip.ip_address(svc.ip)
            if any([svc_ip in network for network in networks]):
                with IMCSenderUDP(svc.ip) as s:
                    s.send(message=msg, port=svc.port)
                return

        # If this point is reached no local interfaces has the target system in its netmask
        # Could be running on same system with no available interfaces
        # Send on loopback
        ports = [svc.port for svc in imcudp_services]
        with IMCSenderUDP('127.0.0.1') as s:
            for port in ports:
                s.send(message=msg, port=port)
_ipaddress.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def cast_interface(s, cur=None):
    if s is None:
        return None
    # Py2 version force the use of unicode. meh.
    return ipaddress.ip_interface(str(s))
test_ipaddress.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_inet_array_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)
        cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]")
        l = cur.fetchone()[0]
        self.assertTrue(l[0] is None)
        self.assertEqual(l[1], ip.ip_interface('127.0.0.1'))
        self.assertEqual(l[2], ip.ip_interface('::ffff:102:300/128'))
        self.assertTrue(isinstance(l[1], ip.IPv4Interface), l)
        self.assertTrue(isinstance(l[2], ip.IPv6Interface), l)
test_ipaddress.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_inet_adapt(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select %s", [ip.ip_interface('127.0.0.1/24')])
        self.assertEqual(cur.fetchone()[0], '127.0.0.1/24')

        cur.execute("select %s", [ip.ip_interface('::ffff:102:300/128')])
        self.assertEqual(cur.fetchone()[0], '::ffff:102:300/128')
_ipaddress.py 文件源码 项目:nmbs-realtime-feed 作者: datamindedbe 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def cast_interface(s, cur=None):
    if s is None:
        return None
    # Py2 version force the use of unicode. meh.
    return ipaddress.ip_interface(unicode(s))
test_ipaddress.py 文件源码 项目:nmbs-realtime-feed 作者: datamindedbe 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_inet_array_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)
        cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]")
        l = cur.fetchone()[0]
        self.assert_(l[0] is None)
        self.assertEquals(l[1], ip.ip_interface('127.0.0.1'))
        self.assertEquals(l[2], ip.ip_interface('::ffff:102:300/128'))
        self.assert_(isinstance(l[1], ip.IPv4Interface), l)
        self.assert_(isinstance(l[2], ip.IPv6Interface), l)


问题


面经


文章

微信
公众号

扫码关注公众号