python类IPRange()的实例源码

test_ports_rbac.py 文件源码 项目:patrole 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def resource_setup(cls):
        super(PortsRbacTest, cls).resource_setup()
        # Create a network and subnet.
        cls.network = cls.create_network()
        cls.cidr = netaddr.IPNetwork(CONF.network.project_network_cidr)
        cls.subnet = cls.create_subnet(cls.network, cidr=cls.cidr,
                                       mask_bits=24)
        cls.ip_range = netaddr.IPRange(
            cls.subnet['allocation_pools'][0]['start'],
            cls.subnet['allocation_pools'][0]['end'])

        cls.port = cls.create_port(cls.network)
        ipaddr = cls.port['fixed_ips'][0]['ip_address']
        cls.port_ip_address = ipaddr
        cls.port_mac_address = cls.port['mac_address']
test_routers_rbac.py 文件源码 项目:patrole 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def resource_setup(cls):
        super(RouterRbacTest, cls).resource_setup()
        # Create a network with external gateway so that
        # ``external_gateway_info`` policies can be validated.
        post_body = {'router:external': True}
        cls.network = cls.create_network(**post_body)
        cls.subnet = cls.create_subnet(cls.network)
        cls.ip_range = netaddr.IPRange(
            cls.subnet['allocation_pools'][0]['start'],
            cls.subnet['allocation_pools'][0]['end'])
        cls.router = cls.create_router()
ARP.py 文件源码 项目:piSociEty 作者: paranoidninja 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_range(self, targets):
        if targets is None:
            return None

        try:
            target_list = []
            for target in targets.split(','):

                if '/' in target:
                    target_list.extend(list(IPNetwork(target)))

                elif '-' in target:
                    start_addr = IPAddress(target.split('-')[0])
                    try:
                        end_addr = IPAddress(target.split('-')[1])
                        ip_range = IPRange(start_addr, end_addr)
                    except AddrFormatError:
                        end_addr = list(start_addr.words)
                        end_addr[-1] = target.split('-')[1]
                        end_addr = IPAddress('.'.join(map(str, end_addr)))
                        ip_range = IPRange(start_addr, end_addr)

                    target_list.extend(list(ip_range))

                else:
                    target_list.append(IPAddress(target))

            return target_list

        except AddrFormatError:
            sys.exit("Specified an invalid IP address/range/network as target")
subnets.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _pool_is_growing(original_pool, new_pool):
    # create IPSet for original pool
    ori_set = netaddr.IPSet()
    for rng in original_pool._alloc_pools:
        ori_set.add(netaddr.IPRange(rng['start'], rng['end']))

    # create IPSet for net pool
    new_set = netaddr.IPSet()
    for rng in new_pool._alloc_pools:
        new_set.add(netaddr.IPRange(rng['start'], rng['end']))

    # we are growing the original set is not a superset of the new set
    return not ori_set.issuperset(new_set)
test_subnets.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_update_allocation_pools(self):
        cidr = "192.168.1.0/24"
        ip_network = netaddr.IPNetwork(cidr)
        network = dict(name="public", tenant_id="fake", network_plugin="BASE")
        network = {"network": network}
        subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
                      cidr=cidr, first_ip=ip_network.first,
                      last_ip=ip_network.last, ip_policy=None,
                      tenant_id="fake")
        subnet = {"subnet": subnet}
        with self._stubs(network, subnet) as (net, sub1):
            subnet = subnet_api.get_subnet(self.context, 1)
            start_pools = subnet['allocation_pools']
            new_pools = [
                [dict(start='192.168.1.10', end='192.168.1.50')],
                [dict(start='192.168.1.5', end='192.168.1.25')],
                [dict(start='192.168.1.50', end='192.168.1.51')],
                [dict(start='192.168.1.50', end='192.168.1.51'),
                    dict(start='192.168.1.100', end='192.168.1.250')],
                [dict(start='192.168.1.50', end='192.168.1.51')],
                start_pools,
            ]
            prev_pool = start_pools
            for pool in new_pools:
                subnet_update = {"subnet": dict(allocation_pools=pool)}
                subnet = subnet_api.update_subnet(self.context, 1,
                                                  subnet_update)
                self.assertNotEqual(prev_pool, subnet['allocation_pools'])
                self.assertEqual(pool, subnet['allocation_pools'])
                policies = policy_api.get_ip_policies(self.context)
                self.assertEqual(1, len(policies))
                policy = policies[0]
                ip_set = netaddr.IPSet()
                for ip in policy['exclude']:
                    ip_set.add(netaddr.IPNetwork(ip))
                for extent in pool:
                    for ip in netaddr.IPRange(extent['start'], extent['end']):
                        self.assertFalse(ip in ip_set)
                prev_pool = pool
test_subnets.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_allow_allocation_pool_growth(self):
        CONF.set_override('allow_allocation_pool_growth', True, 'QUARK')
        cidr = "192.168.1.0/24"
        ip_network = netaddr.IPNetwork(cidr)
        network = dict(name="public", tenant_id="fake", network_plugin="BASE")
        network = {"network": network}
        pool = [dict(start='192.168.1.15', end='192.168.1.30')]
        subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
                      cidr=cidr, first_ip=ip_network.first,
                      last_ip=ip_network.last, ip_policy=None,
                      allocation_pools=pool, tenant_id="fake")
        subnet = {"subnet": subnet}
        with self._stubs(network, subnet) as (net, sub1):
            subnet = subnet_api.get_subnet(self.context, 1)
            start_pools = subnet['allocation_pools']
            new_pool = [dict(start='192.168.1.10', end='192.168.1.50')]

            subnet_update = {"subnet": dict(allocation_pools=new_pool)}
            subnet = subnet_api.update_subnet(self.context, 1,
                                              subnet_update)
            self.assertNotEqual(start_pools, subnet['allocation_pools'])
            self.assertEqual(new_pool, subnet['allocation_pools'])
            policies = policy_api.get_ip_policies(self.context)
            self.assertEqual(1, len(policies))
            policy = policies[0]
            ip_set = netaddr.IPSet()
            for ip in policy['exclude']:
                ip_set.add(netaddr.IPNetwork(ip))
            for extent in new_pool:
                for ip in netaddr.IPRange(extent['start'], extent['end']):
                    self.assertFalse(ip in ip_set)

            start_ip_set = netaddr.IPSet()
            for rng in start_pools:
                start_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            new_ip_set = netaddr.IPSet()
            for rng in subnet['allocation_pools']:
                new_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            self.assertTrue(start_ip_set | new_ip_set != start_ip_set)
test_subnets.py 文件源码 项目:quark 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_do_not_allow_allocation_pool_growth(self):
        CONF.set_override('allow_allocation_pool_growth', False, 'QUARK')
        cidr = "192.168.1.0/24"
        ip_network = netaddr.IPNetwork(cidr)
        network = dict(name="public", tenant_id="fake", network_plugin="BASE")
        network = {"network": network}
        pool = [dict(start='192.168.1.15', end='192.168.1.30')]
        subnet = dict(id=1, ip_version=4, next_auto_assign_ip=2,
                      cidr=cidr, first_ip=ip_network.first,
                      last_ip=ip_network.last, ip_policy=None,
                      allocation_pools=pool, tenant_id="fake")
        subnet = {"subnet": subnet}
        with self._stubs(network, subnet) as (net, sub1):
            subnet = subnet_api.get_subnet(self.context, 1)
            start_pools = subnet['allocation_pools']
            new_pool = [dict(start='192.168.1.10', end='192.168.1.50')]

            start_ip_set = netaddr.IPSet()
            for rng in start_pools:
                start_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            new_ip_set = netaddr.IPSet()
            for rng in new_pool:
                new_ip_set.add(netaddr.IPRange(rng['start'], rng['end']))

            self.assertTrue(start_ip_set | new_ip_set != start_ip_set)

            subnet_update = {"subnet": dict(allocation_pools=new_pool)}
            with self.assertRaises(n_exc.BadRequest):
                subnet = subnet_api.update_subnet(self.context, 1,
                                                  subnet_update)
test_ip_sets.py 文件源码 项目:event-driven-security 作者: acantril 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_ipset_basic_api():
    range1 = IPRange('192.0.2.1', '192.0.2.15')

    ip_list = [
        IPAddress('192.0.2.1'),
        '192.0.2.2/31',
        IPNetwork('192.0.2.4/31'),
        IPAddress('192.0.2.6'),
        IPAddress('192.0.2.7'),
        '192.0.2.8',
        '192.0.2.9',
        IPAddress('192.0.2.10'),
        IPAddress('192.0.2.11'),
        IPNetwork('192.0.2.12/30'),
    ]

    set1 = IPSet(range1.cidrs())

    set2 = IPSet(ip_list)

    assert set2 == IPSet([
        '192.0.2.1/32',
        '192.0.2.2/31',
        '192.0.2.4/30',
        '192.0.2.8/29',
    ])

    assert set1 == set2
    assert set2.pop() in set1
    assert set1 != set2
test_ip_sets.py 文件源码 项目:event-driven-security 作者: acantril 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_ipset_constructor():
    assert IPSet(['192.0.2.0']) == IPSet(['192.0.2.0/32'])
    assert IPSet([IPAddress('192.0.2.0')]) == IPSet(['192.0.2.0/32'])
    assert IPSet([IPNetwork('192.0.2.0')]) == IPSet(['192.0.2.0/32'])
    assert IPSet(IPNetwork('1234::/32')) == IPSet(['1234::/32'])
    assert IPSet([IPNetwork('192.0.2.0/24')]) == IPSet(['192.0.2.0/24'])
    assert IPSet(IPSet(['192.0.2.0/32'])) == IPSet(['192.0.2.0/32'])
    assert IPSet(IPRange("10.0.0.0", "10.0.1.31")) == IPSet(['10.0.0.0/24', '10.0.1.0/27'])
    assert IPSet(IPRange('0.0.0.0', '255.255.255.255')) == IPSet(['0.0.0.0/0'])
test_ip_sets.py 文件源码 项目:event-driven-security 作者: acantril 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_ipset_member_insertion_and_deletion():
    s1 = IPSet()
    s1.add('192.0.2.0')
    assert s1 == IPSet(['192.0.2.0/32'])

    s1.remove('192.0.2.0')
    assert s1 == IPSet([])

    s1.add(IPRange("10.0.0.0", "10.0.0.255"))
    assert s1 == IPSet(['10.0.0.0/24'])

    s1.remove(IPRange("10.0.0.128", "10.10.10.10"))
    assert s1 == IPSet(['10.0.0.0/25'])
test_ip_sets.py 文件源码 项目:event-driven-security 作者: acantril 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_ipset_membership():
    iprange = IPRange('192.0.1.255', '192.0.2.16')

    assert iprange.cidrs() == [
        IPNetwork('192.0.1.255/32'),
        IPNetwork('192.0.2.0/28'),
        IPNetwork('192.0.2.16/32'),
    ]

    ipset = IPSet(['192.0.2.0/28'])

    assert [(str(ip), ip in ipset) for ip in iprange] == [
        ('192.0.1.255', False),
        ('192.0.2.0', True),
        ('192.0.2.1', True),
        ('192.0.2.2', True),
        ('192.0.2.3', True),
        ('192.0.2.4', True),
        ('192.0.2.5', True),
        ('192.0.2.6', True),
        ('192.0.2.7', True),
        ('192.0.2.8', True),
        ('192.0.2.9', True),
        ('192.0.2.10', True),
        ('192.0.2.11', True),
        ('192.0.2.12', True),
        ('192.0.2.13', True),
        ('192.0.2.14', True),
        ('192.0.2.15', True),
        ('192.0.2.16', False),
    ]
test_ip_sets.py 文件源码 项目:event-driven-security 作者: acantril 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_ipset_clear():
    ipset = IPSet(['10.0.0.0/16'])
    ipset.update(IPRange('10.1.0.0', '10.1.255.255'))
    assert ipset == IPSet(['10.0.0.0/15'])

    ipset.clear()
    assert ipset == IPSet([])
test_ip_sets.py 文件源码 项目:event-driven-security 作者: acantril 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_converting_ipsets_to_ipranges():
    assert list(IPSet().iter_ipranges()) == []
    assert list(IPSet([IPAddress('10.0.0.1')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.1')]
    assert list(IPSet([IPAddress('10.0.0.1'), IPAddress('10.0.0.2')]).iter_ipranges()) == [IPRange('10.0.0.1', '10.0.0.2')]
test_ip_sets.py 文件源码 项目:event-driven-security 作者: acantril 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_len_on_ipset_failure_with_large_ipv6_addresses():
    s1 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint, 6)))
    with pytest.raises(IndexError):
        len(s1)

    s2 = IPSet(IPRange(IPAddress("::0"), IPAddress(_sys_maxint - 1, 6)))
    assert len(s2) == _sys_maxint
test_ip_sets.py 文件源码 项目:event-driven-security 作者: acantril 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_ipset_ipv4_and_ipv4_separation():
    assert list(IPSet([IPAddress(1, 4), IPAddress(1, 6)]).iter_ipranges()) == [IPRange('0.0.0.1', '0.0.0.1'), IPRange('::1', '::1')]
test_ip_ranges.py 文件源码 项目:event-driven-security 作者: acantril 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_iprange_boolean_evaluation():
    assert bool(IPRange('0.0.0.0', '255.255.255.255'))
    assert bool(IPRange('0.0.0.0', '0.0.0.0'))
test_ip_ranges.py 文件源码 项目:event-driven-security 作者: acantril 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_iprange_indexing():
    iprange = IPRange('192.0.2.1', '192.0.2.254')

    assert len(iprange) == 254
    assert iprange.first == 3221225985
    assert iprange.last == 3221226238
    assert iprange[0] == IPAddress('192.0.2.1')
    assert iprange[-1] == IPAddress('192.0.2.254')

    with pytest.raises(IndexError):
        iprange[512]
test_ip_ranges.py 文件源码 项目:event-driven-security 作者: acantril 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_iprange_ipv6_unsupported_slicing():
    with pytest.raises(TypeError):
        IPRange('::ffff:192.0.2.1', '::ffff:192.0.2.254')[0:10:2]
test_ip_ranges.py 文件源码 项目:event-driven-security 作者: acantril 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_iprange_membership():
    assert IPRange('192.0.2.5', '192.0.2.10') in IPRange('192.0.2.1', '192.0.2.254')
    assert IPRange('fe80::1', 'fe80::fffe') in IPRange('fe80::', 'fe80::ffff:ffff:ffff:ffff')
    assert IPRange('192.0.2.5', '192.0.2.10') not in IPRange('::', '::255.255.255.255')
test_ip_ranges.py 文件源码 项目:event-driven-security 作者: acantril 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_more_iprange_sorting():
    ipranges = (IPRange('192.0.2.40', '192.0.2.50'), IPRange('192.0.2.20', '192.0.2.30'), IPRange('192.0.2.1', '192.0.2.254'),)

    assert sorted(ipranges) == [IPRange('192.0.2.1', '192.0.2.254'), IPRange('192.0.2.20', '192.0.2.30'), IPRange('192.0.2.40', '192.0.2.50')]

    ipranges = list(ipranges)
    ipranges.append(IPRange('192.0.2.45', '192.0.2.49'))

    assert sorted(ipranges) == [
        IPRange('192.0.2.1', '192.0.2.254'),
        IPRange('192.0.2.20', '192.0.2.30'),
        IPRange('192.0.2.40', '192.0.2.50'),
        IPRange('192.0.2.45', '192.0.2.49'),
    ]


问题


面经


文章

微信
公众号

扫码关注公众号