python类IPSet()的实例源码

test_ip_sets.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_ipset_operations_with_combined_ipv4_and_ipv6():
    s1 = IPSet(['192.0.2.0', '::192.0.2.0', '192.0.2.2', '::192.0.2.2'])
    s2 = IPSet(['192.0.2.2', '::192.0.2.2', '192.0.2.4', '::192.0.2.4'])
    s3 = IPSet(['0.0.0.1', '10.0.0.64/30', '255.255.255.1'])
    s4 = IPSet(['10.0.0.64', '10.0.0.66'])
    s4b = IPSet(['10.0.0.64', '10.0.0.66', '111.111.111.111'])
    s5 = IPSet(['10.0.0.65', '10.0.0.67'])
    s6 = IPSet(['2405:8100::/32'])

    assert bool(s6)
    assert not bool(IPSet())

    #   set intersection
    assert s2 & s1 == IPSet(['192.0.2.2/32', '::192.0.2.2/128'])
    assert s3 & s4 == IPSet(['10.0.0.64/32', '10.0.0.66/32'])
    assert s4 & s3 == IPSet(['10.0.0.64/32', '10.0.0.66/32'])
    assert s3 & s5 == IPSet(['10.0.0.65/32', '10.0.0.67/32'])
    assert s5 & s3 == IPSet(['10.0.0.65/32', '10.0.0.67/32'])

    #   set difference
    assert s3 - s4 == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32'])
    assert s4 - s3 == IPSet([])
    assert s3 - s4b == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32'])
    assert s3 - s5 == IPSet(['0.0.0.1/32', '10.0.0.64/32', '10.0.0.66/32', '255.255.255.1/32'])
    assert s5 - s3 == IPSet([])

    #   set symmetric difference
    assert s2 ^ s1 == IPSet(['192.0.2.0/32', '192.0.2.4/32', '::192.0.2.0/128', '::192.0.2.4/128'])
    assert IPSet([]) ^ IPSet([]) == IPSet([])
    assert IPSet(['0.0.0.1/32']) ^ IPSet([]) == IPSet(['0.0.0.1/32'])
    assert IPSet(['0.0.0.1/32']) ^ IPSet(['0.0.0.1/32']) == IPSet([])
    assert s3 ^ s4 == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32'])
    assert s4 ^ s3 == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '255.255.255.1/32'])
    assert s3 ^ s4b == IPSet(['0.0.0.1/32', '10.0.0.65/32', '10.0.0.67/32', '111.111.111.111/32', '255.255.255.1/32'])
    assert s3 ^ s5 == IPSet(['0.0.0.1/32', '10.0.0.64/32', '10.0.0.66/32', '255.255.255.1/32'])
    assert s5 ^ s3 == IPSet(['0.0.0.1/32', '10.0.0.64/32', '10.0.0.66/32', '255.255.255.1/32'])
test_ip_sets.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_converting_ipsets_to_ipranges():
    assert list(IPSet().iter_ipranges()) == []
    assert list(IPSet([IPAddress('10.0.0.1')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.1')]
    assert list(IPSet([IPAddress('10.0.0.1'), IPAddress('10.0.0.2')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.2')]
test_ip_sets.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_len_on_ipset_failure_with_large_ipv6_addresses():
    s1 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint, 6)))
    with pytest.raises(IndexError):
        len(s1)

    s2 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint - 1, 6)))
    assert len(s2) == _sys_maxint
test_ip_sets.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_ipset_ipv4_and_ipv4_separation():
    assert list(IPSet([IPAddress(1, 4), IPAddress(1, 6)]).iter_ipranges()) == [IPRange('0.0.0.1', '0.0.0.1'), IPRange('::1', '::1')]
test_ip_sets.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_ipset_exceptions():
    s1 = IPSet(['10.0.0.1'])

    #   IPSet objects are not hashable.
    with pytest.raises(TypeError):
        hash(s1)

    #   Bad update argument type.
    with pytest.raises(TypeError):
        s1.update(42)
test_ip_sets.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_ipset_converts_to_cidr_networks_v4():
    s1 = IPSet(IPNetwork('10.1.2.3/8'))
    s1.add(IPNetwork('192.168.1.2/16'))
    assert list(s1.iter_cidrs()) == [
        IPNetwork('10.0.0.0/8'),
        IPNetwork('192.168.0.0/16'),
    ]
test_ip_sets.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_ipset_converts_to_cidr_networks_v6():
    s1 = IPSet(IPNetwork('fe80::4242/64'))
    s1.add(IPNetwork('fe90::4343/64'))
    assert list(s1.iter_cidrs()) == [
        IPNetwork('fe80::/64'),
        IPNetwork('fe90::/64'),
    ]
formatting.py 文件源码 项目:sender_policy_flattener 作者: cetanu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def ips_to_spf_strings(ips):
    other_tokens = list()
    for index, ip in enumerate(ips):
        try:
            IPNetwork(ip)
        except AddrFormatError:
            other_tokens.append(ip)
    for token in other_tokens:
        ips.remove(token)
    ips = [str(i) for i in IPSet(ips).iter_cidrs()]
    ips = ['ip6:' + ip if ':' in ip else
           'ip4:' + ip.replace('/32', '')
           for ip in ips]
    return ips + other_tokens
configuration_data_randomizer.py 文件源码 项目:restconf-examples 作者: CiscoDevNet 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def random_address(base):
    """Return a random address based on a base prefix."""
    prefix = netaddr.IPNetwork(base)
    addresses = netaddr.IPSet(prefix)
    for address in [prefix.network, prefix.broadcast]:
        addresses.remove(address)
    return str(random.choice(list(addresses))) + '/' + str(prefix.prefixlen)
cidr-exclude.py 文件源码 项目:cidr-tools 作者: silverwind 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def ipset(nets):
  v4nets = netaddr.IPSet()
  v6nets = netaddr.IPSet()
  for net in nets:
    ipNetwork = netaddr.IPNetwork(net)
    parts = str(ipNetwork).split("/")
    ip = parts[0]
    mask = parts[1]
    if netaddr.valid_ipv4(ip) and int(mask) <= 32:
      v4nets.add(ipNetwork)
    elif netaddr.valid_ipv6(ip) and int(mask) <= 128:
      v6nets.add(ipNetwork)
  return v4nets, v6nets
ipv4cn.py 文件源码 项目:regional-ip-addresses 作者: x1angli 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def populate_reserved(self):
        with open(self.reserved_ip, 'r') as f:
            lines = f.readlines()

        ip_list = []
        for line in lines:
            if not line.startswith('#'):
                line = line.strip()
                if len(line) > 0:
                    ip_list.append(line)

        self.ipset_reserved = IPSet(ip_list)
ipv4cn.py 文件源码 项目:regional-ip-addresses 作者: x1angli 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def derive_outwall(self):
        """
        This would not only inverse the set with the "big one", it would also exclude
        See: http://www.tcpipguide.com/free/t_IPReservedPrivateandLoopbackAddresses-3.htm
        """

        self.ipset_outwall = IPSet(['0.0.0.0/0']) - self.ipset_inwall - self.ipset_reserved
        self.cidrs_outwall = list(self.ipset_outwall.iter_cidrs())

        logging.info("Finished deriving out-wall IP table(s). Total: %i CIDR blocks.", len(self.cidrs_outwall), )
subnets.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _validate_subnet_cidr(context, network_id, new_subnet_cidr):
    """Validate the CIDR for a subnet.

    Verifies the specified CIDR does not overlap with the ones defined
    for the other subnets specified for this network, or with any other
    CIDR if overlapping IPs are disabled.

    """
    if neutron_cfg.cfg.CONF.allow_overlapping_ips:
        return

    try:
        new_subnet_ipset = netaddr.IPSet([new_subnet_cidr])
    except TypeError:
        LOG.exception("Invalid or missing cidr: %s" % new_subnet_cidr)
        raise n_exc.BadRequest(resource="subnet",
                               msg="Invalid or missing cidr")

    filters = {
        'network_id': network_id,
        'shared': [False]
    }
    # Using admin context here, in case we actually share networks later
    subnet_list = db_api.subnet_find(context=context.elevated(), **filters)

    for subnet in subnet_list:
        if (netaddr.IPSet([subnet.cidr]) & new_subnet_ipset):
            # don't give out details of the overlapping subnet
            err_msg = (_("Requested subnet with cidr: %(cidr)s for "
                         "network: %(network_id)s overlaps with another "
                         "subnet") %
                       {'cidr': new_subnet_cidr,
                        'network_id': network_id})
            LOG.error(_("Validation for CIDR: %(new_cidr)s failed - "
                        "overlaps with subnet %(subnet_id)s "
                        "(CIDR: %(cidr)s)"),
                      {'new_cidr': new_subnet_cidr,
                       'subnet_id': subnet.id,
                       'cidr': subnet.cidr})
            raise n_exc.InvalidInput(error_message=err_msg)
subnets.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _pool_is_growing(original_pool, new_pool):
    # create IPSet for original pool
    ori_set = netaddr.IPSet()
    for rng in original_pool._alloc_pools:
        ori_set.add(netaddr.IPRange(rng['start'], rng['end']))

    # create IPSet for net pool
    new_set = netaddr.IPSet()
    for rng in new_pool._alloc_pools:
        new_set.add(netaddr.IPRange(rng['start'], rng['end']))

    # we are growing the original set is not a superset of the new set
    return not ori_set.issuperset(new_set)
ip_policies.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def ensure_default_policy(cidrs, subnets):
    policy_cidrs = netaddr.IPSet(cidrs)
    for subnet in subnets:
        subnet_cidr = netaddr.IPNetwork(subnet["cidr"])
        network_ip = subnet_cidr.network
        broadcast_ip = subnet_cidr.broadcast
        prefix_len = '32' if subnet_cidr.version == 4 else '128'
        default_policy_cidrs = ["%s/%s" % (network_ip, prefix_len),
                                "%s/%s" % (broadcast_ip, prefix_len)]
        for cidr in default_policy_cidrs:
            if (netaddr.IPNetwork(cidr) not in policy_cidrs
                    and cidr not in cidrs):
                cidrs.append(cidr)
models.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def allocation_pools(self):
        _cache = self.get("_allocation_pool_cache")
        if _cache:
            pools = json.loads(_cache)
            return pools
        else:
            if self["ip_policy"]:
                ip_policy_cidrs = self["ip_policy"].get_cidrs_ip_set()
            else:
                ip_policy_cidrs = netaddr.IPSet([])

            cidr = netaddr.IPSet([netaddr.IPNetwork(self["cidr"])])
            allocatable = cidr - ip_policy_cidrs
            pools = _pools_from_cidr(allocatable)
            return pools
models.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_cidrs_ip_set(self):
        ip_policies = self.get("exclude", [])
        ip_policy_cidrs = [ip_policy.cidr for ip_policy in ip_policies]
        return netaddr.IPSet(ip_policy_cidrs)
28e55acaf366_populate_ip_policy_size.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def upgrade():
    ip_policy = table('quark_ip_policy',
                      column('id', sa.String(length=36)),
                      column('size', INET()))
    ip_policy_cidrs = table('quark_ip_policy_cidrs',
                            column('ip_policy_id', sa.String(length=36)),
                            column('cidr', sa.String(length=64)))
    connection = op.get_bind()

    # 1. Retrieve all ip_policy_cidr rows.
    results = connection.execute(
        select([ip_policy_cidrs.c.ip_policy_id, ip_policy_cidrs.c.cidr])
    ).fetchall()

    # 2. Determine IPSet for each IP Policy.
    ipp = dict()
    for ip_policy_id, cidr in results:
        if ip_policy_id not in ipp:
            ipp[ip_policy_id] = netaddr.IPSet()
        ipp[ip_policy_id].add(cidr)

    # 3. Populate size for each IP Policy.
    for ip_policy_id in ipp:
        connection.execute(ip_policy.update().values(
            size=ipp[ip_policy_id].size).where(
                ip_policy.c.id == ip_policy_id))
test_subnets.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_allow_allocation_pool_growth(self):
        CONF.set_override('allow_allocation_pool_growth', True, 'QUARK')
        cidr = "192.168.1.0/24"
        ip_network = netaddr.IPNetwork(cidr)
        network = dict(name="public", tenant_id="fake", network_plugin="BASE")
        network = {"network": network}
        pool = [dict(start='192.168.1.15', end='192.168.1.30')]
        subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
                      cidr=cidr, first_ip=ip_network.first,
                      last_ip=ip_network.last, ip_policy=None,
                      allocation_pools=pool, tenant_id="fake")
        subnet = {"subnet": subnet}
        with self._stubs(network, subnet) as (net, sub1):
            subnet = subnet_api.get_subnet(self.context, 1)
            start_pools = subnet['allocation_pools']
            new_pool = [dict(start='192.168.1.10', end='192.168.1.50')]

            subnet_update = {"subnet": dict(allocation_pools=new_pool)}
            subnet = subnet_api.update_subnet(self.context, 1,
                                              subnet_update)
            self.assertNotEqual(start_pools, subnet['allocation_pools'])
            self.assertEqual(new_pool, subnet['allocation_pools'])
            policies = policy_api.get_ip_policies(self.context)
            self.assertEqual(1, len(policies))
            policy = policies[0]
            ip_set = netaddr.IPSet()
            for ip in policy['exclude']:
                ip_set.add(netaddr.IPNetwork(ip))
            for extent in new_pool:
                for ip in netaddr.IPRange(extent['start'], extent['end']):
                    self.assertFalse(ip in ip_set)

            start_ip_set = netaddr.IPSet()
            for rng in start_pools:
                start_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            new_ip_set = netaddr.IPSet()
            for rng in subnet['allocation_pools']:
                new_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            self.assertTrue(start_ip_set | new_ip_set != start_ip_set)
test_subnets.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_do_not_allow_allocation_pool_growth(self):
        CONF.set_override('allow_allocation_pool_growth', False, 'QUARK')
        cidr = "192.168.1.0/24"
        ip_network = netaddr.IPNetwork(cidr)
        network = dict(name="public", tenant_id="fake", network_plugin="BASE")
        network = {"network": network}
        pool = [dict(start='192.168.1.15', end='192.168.1.30')]
        subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
                      cidr=cidr, first_ip=ip_network.first,
                      last_ip=ip_network.last, ip_policy=None,
                      allocation_pools=pool, tenant_id="fake")
        subnet = {"subnet": subnet}
        with self._stubs(network, subnet) as (net, sub1):
            subnet = subnet_api.get_subnet(self.context, 1)
            start_pools = subnet['allocation_pools']
            new_pool = [dict(start='192.168.1.10', end='192.168.1.50')]

            start_ip_set = netaddr.IPSet()
            for rng in start_pools:
                start_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            new_ip_set = netaddr.IPSet()
            for rng in new_pool:
                new_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            self.assertTrue(start_ip_set | new_ip_set != start_ip_set)

            subnet_update = {"subnet": dict(allocation_pools=new_pool)}
            with self.assertRaises(n_exc.BadRequest):
                subnet = subnet_api.update_subnet(self.context, 1,
                                                  subnet_update)


问题


面经


文章

微信
公众号

扫码关注公众号