python类ip_network()的实例源码

validate.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def public_network(self):
        """
        All nodes must have the same public network.  The public network
        must be valid.
        """
        for node in self.data.keys():
            public_network = self.data[node].get("public_network", "")
            net_list = Util.parse_list_from_string(public_network)

            log.debug("public_network: {} {}".format(node, net_list))
            for network in net_list:
                try:
                    ipaddress.ip_network(u'{}'.format(network))
                except ValueError as err:
                    msg = "{} on {} is not valid: {}".format(network, node, err)
                    self.errors.setdefault('public_network', []).append(msg)

        self._set_pass_status('public_network')
validate.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def cluster_network(self):
        """
        All storage nodes must have the same cluster network.  The cluster
        network must be valid.
        """
        for node in self.data.keys():
            if ('roles' in self.data[node] and
                'storage' in self.data[node]['roles']):

                cluster_network = self.data[node].get("cluster_network", "")
                net_list = Util.parse_list_from_string(cluster_network)
                log.debug("cluster_network: {} {}".format(node, net_list))
                for network in net_list:
                    try:
                        ipaddress.ip_network(u'{}'.format(network))
                    except ValueError as err:
                        msg = "{} on {} is not valid: {}".format(network, node, err)
                        self.errors.setdefault('cluster_network', []).append(msg)

        self._set_pass_status('cluster_network')
validators.py 文件源码 项目:thorn 作者: robinhood 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def block_cidr_network(*blocked_networks):
    # type: (*str) -> Callable
    """Block recipient URLs from a list of CIDR networks.

    Example:
        >>> block_cidr_network('192.168.0.0/24', '132.34.23.0/24')
    """
    _blocked_networks = [ip_network(text_type(x)) for x in blocked_networks]

    def validate_cidr(recipient_url):
        # type: (str) -> None
        recipient_addr = _url_ip_address(recipient_url)
        for blocked_network in _blocked_networks:
            if recipient_addr in blocked_network:
                raise SecurityError(
                    'IP address of recipient {0}={1} is in network {2}'.format(
                        recipient_url, recipient_addr, blocked_network,
                    ))
    validate_cidr._args = blocked_networks
    validate_cidr._validator = 'block_cidr_network'
    return validate_cidr
test_ipaddress.py 文件源码 项目:psycopg2-for-aws-lambda 作者: iwitaly 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_cidr_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select null::cidr")
        self.assertTrue(cur.fetchone()[0] is None)

        cur.execute("select '127.0.0.0/24'::cidr")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv4Network), repr(obj))
        self.assertEqual(obj, ip.ip_network('127.0.0.0/24'))

        cur.execute("select '::ffff:102:300/128'::cidr")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv6Network), repr(obj))
        self.assertEqual(obj, ip.ip_network('::ffff:102:300/128'))
interfaces.py 文件源码 项目:smc-python 作者: gabstopper 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def set_unset(self, interface_id, attribute, address=None):
        """
        Set attribute to True and unset the same attribute for all other
        interfaces. This is used for interface options that can only be
        set on one engine interface.
        """
        for interface in self:
            for sub_interface in interface.sub_interfaces():
                # Skip VLAN only interfaces (no addresses)
                if not isinstance(sub_interface, PhysicalVlanInterface):
                    if getattr(sub_interface, attribute) is not None:
                        if sub_interface.nicid == str(interface_id):
                            if address is not None: # Find IP on Node Interface
                                if ipaddress.ip_address(bytes_to_unicode(address)) in \
                                    ipaddress.ip_network(sub_interface.network_value):
                                    setattr(sub_interface, attribute, True)
                                else:
                                    setattr(sub_interface, attribute, False)
                            else:
                                setattr(sub_interface, attribute, True)
                        else: #unset
                            setattr(sub_interface, attribute, False)
discovery.py 文件源码 项目:SupercomputerInABriefcase 作者: SupercomputerInABriefcase 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def process_arguments(args: List[str]) -> ConnectionData:
    assert isinstance(args, list)
    for a in args:
        assert isinstance(a, str)
    if len(args) == 0:
        #rv = ConnectionData(ip_net='192.168.3.0/24', iface='eth0')
        rv = ConnectionData(ip_net='192.168.3.0/24', iface='wlan0')
    elif len(args) == 1:
        rv = ConnectionData(ip_net=args[0], iface='eth0')
    elif len(args) == 2:
        rv = ConnectionData(ip_net=args[0], iface=args[1])
    else:
        print('Incorrect number of arguments.')
        sys.exit(1)
    try:
        ipaddress.ip_network(rv.ip_net)
    except ValueError:
        print("First parameter doesn't appear to be an IP network specification.")
        sys.exit(1)
    return rv
config.py 文件源码 项目:do-portal 作者: certeu 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _validate_cidr_format(cidr):
    """Validate CIDR IP range
    :param str cidr:
    :return:
    :rtype: bool
    """
    try:
        ipaddress.ip_network(cidr, strict=False)
    except (ValueError, ipaddress.AddressValueError,
            ipaddress.NetmaskValueError):
        return False
    if '/' not in cidr:
        return False
    if re.search('\s', cidr):
        return False
    return True
server.py 文件源码 项目:piqueserver 作者: piqueserver 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def add_ban(self, ip, reason, duration, name=None):
        """
        Ban an ip with an optional reason and duration in minutes. If duration
        is None, ban is permanent.
        """
        network = ip_network(text_type(ip), strict=False)
        for connection in list(self.connections.values()):
            if ip_address(connection.address[0]) in network:
                name = connection.name
                connection.kick(silent=True)
        if duration:
            duration = reactor.seconds() + duration * 60
        else:
            duration = None
        self.bans[ip] = (name or '(unknown)', reason, duration)
        self.save_bans()
pixied.py 文件源码 项目:pixie 作者: algorithm-ninja 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load_config(self, config):
        with open(config) as c:
            cfg = json.load(c)
        m = hashlib.sha224()
        m.update(bytes(cfg['subnet'], 'utf-8'))
        m.update(bytes(cfg['swap_size']))
        m.update(bytes(cfg['root_size']))
        m.update(bytes(cfg['extra_args'], 'utf-8'))
        # TODO: check sizes
        for f in cfg['hashes']:
            with open(f, 'rb') as fl:
                for line in fl:
                    m.update(line)
        cfg['sha224'] = m.hexdigest()
        cfg['subnet'] = ipaddress.ip_network(cfg['subnet'])
        cfg['image_method'] = IMAGE_METHOD
        return cfg
optimizeManifest.py 文件源码 项目:solace-messaging-cf-dev 作者: SolaceLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def getTestNetworkIps(manifest):
    testSubnets = getTestSubnets(manifest)
    testSubnet = testSubnets[0]
    otherSubnets = testSubnets[1:]

    testIpSubnet = ipaddress.ip_network(testSubnet["range"])
    otherIpSubnets = [ipaddress.ip_network(s["range"]) for s in otherSubnets]
    subnetGateway = testSubnet["gateway"]

    overlapOtherIpSubnets = [s for s in otherIpSubnets if testIpSubnet.overlaps(s)]
    for otherIpSubnet in overlapOtherIpSubnets:
        testIpSubnet = testIpSubnet.address_exclude(otherIpSubnet)

    testHosts = [h.exploded for h in testIpSubnet.hosts()]
    testHosts.remove(subnetGateway)
    return testHosts
models.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _ip_network_hosts(obj, page=None):
    """
    Returns a portion of IP addresses based on page parameter.
    :returns: IP addresses
    :rtype: iterator (AC-3531)
    Previous rtype was a generator
    """
    pages = obj.pages()
    page = _page(page, pages)
    net_ip = obj.network_address + (page - 1) * 2 ** obj.page_bits
    net_pl = obj.max_prefixlen - obj.page_bits if pages > 1 else obj.prefixlen
    network = ipaddress.ip_network(u'{0}/{1}'.format(net_ip, net_pl))
    # This method used to return _BaseNetwork.hosts() generator,
    # however the default implementation would skip network and
    # broadcast addresses, which made the final IP pool short of
    # two IP addresses.  As of AC-3531 fix, now it returns
    # the _BaseNetwork.__iter__() instead, which does not skip any addresses.
    return iter(network)
models.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def to_dict(self, include=None, exclude=None, page=None):
        free_hosts_and_busy = self.free_hosts_and_busy(page=page)
        pages = ip_network(self.network).pages()
        page = _page(page, pages)
        data = dict(
            id=self.network,
            network=self.network,
            ipv6=self.ipv6,
            free_hosts=self.free_hosts(page=page),
            blocked_list=list(self.get_blocked_set()),
            node=None if self.node is None else self.node.hostname,
            allocation=free_hosts_and_busy,
            page=page,
            pages=pages,
        )
        return data
node_network_plugin.py 文件源码 项目:kuberdock-platform 作者: cloudlinux 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def init():
    config = read_config_ini('/run/flannel/subnet.env')
    config_network = config['flannel_network']
    config_subnet = config['flannel_subnet']
    _update_ipset('kuberdock_flannel', [config_network], set_type='hash:net')
    _update_ipset('kuberdock_nodes', [])
    _update_ipset('kuberdock_cluster',
                  ['kuberdock_flannel', 'kuberdock_nodes'],
                  set_type='list:set')
    network = ipaddress.ip_network(unicode(config_network))
    subnet = ipaddress.ip_network(unicode(config_subnet), strict=False)
    etcd = ETCD()
    for user in etcd.users():
        for pod in etcd.pods(user):
            pod_ip = ipaddress.ip_address(pod)
            if pod_ip not in network or pod_ip in subnet:
                etcd.delete_user(user, pod)
    update_ipset()
filter.py 文件源码 项目:sentry-health 作者: getsentry 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _ip_is_system_blacklisted(self, addr):
        for net in self.env.get_config('apiserver.whitelisted_ips'):
            try:
                net = ip_network(net, strict=False)
            except ValueError:
                continue
            if addr in net:
                return False
        for net in self.env.get_config('apiserver.blacklisted_ips'):
            try:
                net = ip_network(net, strict=False)
            except ValueError:
                continue
            if addr in net:
                return True
        return False
test_ipaddress.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_cidr_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select null::cidr")
        self.assertTrue(cur.fetchone()[0] is None)

        cur.execute("select '127.0.0.0/24'::cidr")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv4Network), repr(obj))
        self.assertEqual(obj, ip.ip_network('127.0.0.0/24'))

        cur.execute("select '::ffff:102:300/128'::cidr")
        obj = cur.fetchone()[0]
        self.assertTrue(isinstance(obj, ip.IPv6Network), repr(obj))
        self.assertEqual(obj, ip.ip_network('::ffff:102:300/128'))
test_ipaddress.py 文件源码 项目:nmbs-realtime-feed 作者: datamindedbe 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_cidr_cast(self):
        import ipaddress as ip
        cur = self.conn.cursor()
        psycopg2.extras.register_ipaddress(cur)

        cur.execute("select null::cidr")
        self.assert_(cur.fetchone()[0] is None)

        cur.execute("select '127.0.0.0/24'::cidr")
        obj = cur.fetchone()[0]
        self.assert_(isinstance(obj, ip.IPv4Network), repr(obj))
        self.assertEquals(obj, ip.ip_network('127.0.0.0/24'))

        cur.execute("select '::ffff:102:300/128'::cidr")
        obj = cur.fetchone()[0]
        self.assert_(isinstance(obj, ip.IPv6Network), repr(obj))
        self.assertEquals(obj, ip.ip_network('::ffff:102:300/128'))
protocol.py 文件源码 项目:pyikev2 作者: alejandro-perez 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def process_acquire(self, xfrm_acquire, xfrm_tmpl):
        small_tsi = TrafficSelector.from_network(ip_network(xfrm_acquire.sel.saddr.to_ipaddr()),
                                                 xfrm_acquire.sel.sport, xfrm_acquire.sel.proto)
        small_tsr = TrafficSelector.from_network(ip_network(xfrm_acquire.sel.daddr.to_ipaddr()),
                                                 xfrm_acquire.sel.dport, xfrm_acquire.sel.proto)
        acquire = Acquire(small_tsi, small_tsr, xfrm_acquire.policy.index)

        request = None
        if self.state == IkeSa.State.INITIAL:
            request = self.generate_ike_sa_init_request(acquire)
        elif self.state == IkeSa.State.ESTABLISHED:
            request = self.generate_create_child_sa_request(acquire)
        else:
            logging.warning('Cannot process acquire while waiting for a response.')

        # TODO: Use a request queue for simplicity and to avoid problems with
        # state machine
        if request:
            request_data = request.to_bytes()
            self.log_message(request, self.peer_addr, request_data, send=True)
            return request_data
        else:
            return None
xfrm.py 文件源码 项目:pyikev2 作者: alejandro-perez 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def create_policies(self, my_addr, peer_addr, ike_conf):
        for ipsec_conf in ike_conf['protect']:
            if ipsec_conf['mode'] == Mode.TUNNEL:
                src_selector = ipsec_conf['my_subnet']
                dst_selector = ipsec_conf['peer_subnet']
            else:
                src_selector = ip_network(my_addr)
                dst_selector = ip_network(peer_addr)

            # generate an index for outbound policies
            index = SystemRandom().randint(0, 10000) << 2 | XFRM_POLICY_OUT
            ipsec_conf['index'] = index

            self._create_policy(src_selector, dst_selector, ipsec_conf['my_port'],
                                ipsec_conf['peer_port'], ipsec_conf['ip_proto'], XFRM_POLICY_OUT,
                                ipsec_conf['ipsec_proto'], ipsec_conf['mode'], my_addr, peer_addr,
                                index=index)
            self._create_policy(dst_selector, src_selector, ipsec_conf['peer_port'],
                                ipsec_conf['my_port'], ipsec_conf['ip_proto'], XFRM_POLICY_IN,
                                ipsec_conf['ipsec_proto'], ipsec_conf['mode'], peer_addr, my_addr)
            self._create_policy(dst_selector, src_selector,
                                ipsec_conf['peer_port'], ipsec_conf['my_port'],
                                ipsec_conf['ip_proto'], XFRM_POLICY_FWD, ipsec_conf['ipsec_proto'],
                                ipsec_conf['mode'], peer_addr, my_addr)
subnet.py 文件源码 项目:esdc-ce 作者: erigones 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def web_data_admin(self):
        """Return dict used in admin/DC web templates"""
        return {
            'name': self.name,
            'alias': self.alias,
            'access': self.access,
            'owner': self.owner.username,
            'desc': self.desc,
            'ip_network': str(self.ip_network),
            'network': self.network,
            'netmask': self.netmask,
            'gateway': self.gateway,
            'nic_tag': self.nic_tag,
            'nic_tag_type': self.nic_tag_type,
            'vlan_id': self.vlan_id,
            'vxlan_id': self.vxlan_id,
            'mtu': self.mtu,
            'resolvers': self.get_resolvers(),
            'dns_domain': self.dns_domain,
            'ptr_domain': self.ptr_domain,
            'dc_bound': self.dc_bound_bool,
            'dhcp_passthrough': self.dhcp_passthrough,
        }
test_ipaddress.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_incompatible_versions(self):
        # These should always raise TypeError
        v4addr = ipaddress.ip_address('1.1.1.1')
        v4net = ipaddress.ip_network('1.1.1.1')
        v6addr = ipaddress.ip_address('::1')
        v6net = ipaddress.ip_address('::1')

        self.assertRaises(TypeError, v4addr.__lt__, v6addr)
        self.assertRaises(TypeError, v4addr.__gt__, v6addr)
        self.assertRaises(TypeError, v4net.__lt__, v6net)
        self.assertRaises(TypeError, v4net.__gt__, v6net)

        self.assertRaises(TypeError, v6addr.__lt__, v4addr)
        self.assertRaises(TypeError, v6addr.__gt__, v4addr)
        self.assertRaises(TypeError, v6net.__lt__, v4net)
        self.assertRaises(TypeError, v6net.__gt__, v4net)
test_ipaddress.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testIpFromInt(self):
        self.assertEqual(self.ipv4_interface._ip,
                         ipaddress.IPv4Interface(16909060)._ip)

        ipv4 = ipaddress.ip_network('1.2.3.4')
        ipv6 = ipaddress.ip_network('2001:658:22a:cafe:200:0:0:1')
        self.assertEqual(ipv4, ipaddress.ip_network(int(ipv4.network_address)))
        self.assertEqual(ipv6, ipaddress.ip_network(int(ipv6.network_address)))

        v6_int = 42540616829182469433547762482097946625
        self.assertEqual(self.ipv6_interface._ip,
                         ipaddress.IPv6Interface(v6_int)._ip)

        self.assertEqual(ipaddress.ip_network(self.ipv4_address._ip).version,
                         4)
        self.assertEqual(ipaddress.ip_network(self.ipv6_address._ip).version,
                         6)
test_ipaddress.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testHash(self):
        self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')),
                         hash(ipaddress.ip_interface('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')),
                         hash(ipaddress.ip_network('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')),
                         hash(ipaddress.ip_address('10.1.1.0')))
        # i70
        self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')),
                         hash(ipaddress.ip_address(
                    int(ipaddress.ip_address('1.2.3.4')._ip))))
        ip1 = ipaddress.ip_address('10.1.1.0')
        ip2 = ipaddress.ip_address('1::')
        dummy = {}
        dummy[self.ipv4_address] = None
        dummy[self.ipv6_address] = None
        dummy[ip1] = None
        dummy[ip2] = None
        self.assertTrue(self.ipv4_address in dummy)
        self.assertTrue(ip2 in dummy)
test_typeconversion.py 文件源码 项目:time2go 作者: twitchyliquid64 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testInetRoundtrip(self):
        try:
            import ipaddress

            v = ipaddress.ip_network('192.168.0.0/28')
            self.cursor.execute("select %s as f1", (v,))
            retval = self.cursor.fetchall()
            self.assertEqual(retval[0][0], v)

            v = ipaddress.ip_address('192.168.0.1')
            self.cursor.execute("select %s as f1", (v,))
            retval = self.cursor.fetchall()
            self.assertEqual(retval[0][0], v)

        except ImportError:
            for v in ('192.168.100.128/25', '192.168.0.1'):
                self.cursor.execute(
                    "select cast(cast(%s as varchar) as inet) as f1", (v,))
                retval = self.cursor.fetchall()
                self.assertEqual(retval[0][0], v)
net.py 文件源码 项目:netgrph 作者: yantisj 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def find_cidr(ip):
    """Finds most specific CIDR in Networks"""

    # Check for non-ip, try DNS
    if not re.search(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', ip):
        try:
            ip = socket.gethostbyname(ip)
        except socket.gaierror:
            raise Exception("Hostname Lookup Failure on: " + ip)

    # Always start with default route
    mostSpecific = "0.0.0.0/0"

    # All networks
    networks = nglib.py2neo_ses.cypher.execute('MATCH (n:Network) RETURN n.cidr as cidr')

    if len(networks) > 1:
        for r in networks.records:
            if ipaddress.ip_address(ip) in ipaddress.ip_network(r.cidr):
                if nglib.verbose > 1:
                    print("find_cidr", ip + " in " + r.cidr)
                mostSpecific = compare_cidr(mostSpecific, r.cidr)

    return mostSpecific
test_ipaddress.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_incompatible_versions(self):
        # These should always raise TypeError
        v4addr = ipaddress.ip_address('1.1.1.1')
        v4net = ipaddress.ip_network('1.1.1.1')
        v6addr = ipaddress.ip_address('::1')
        v6net = ipaddress.ip_network('::1')

        self.assertRaises(TypeError, v4addr.__lt__, v6addr)
        self.assertRaises(TypeError, v4addr.__gt__, v6addr)
        self.assertRaises(TypeError, v4net.__lt__, v6net)
        self.assertRaises(TypeError, v4net.__gt__, v6net)

        self.assertRaises(TypeError, v6addr.__lt__, v4addr)
        self.assertRaises(TypeError, v6addr.__gt__, v4addr)
        self.assertRaises(TypeError, v6net.__lt__, v4net)
        self.assertRaises(TypeError, v6net.__gt__, v4net)
test_ipaddress.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def testIpFromInt(self):
        self.assertEqual(self.ipv4_interface._ip,
                         ipaddress.IPv4Interface(16909060)._ip)

        ipv4 = ipaddress.ip_network('1.2.3.4')
        ipv6 = ipaddress.ip_network('2001:658:22a:cafe:200:0:0:1')
        self.assertEqual(ipv4, ipaddress.ip_network(int(ipv4.network_address)))
        self.assertEqual(ipv6, ipaddress.ip_network(int(ipv6.network_address)))

        v6_int = 42540616829182469433547762482097946625
        self.assertEqual(self.ipv6_interface._ip,
                         ipaddress.IPv6Interface(v6_int)._ip)

        self.assertEqual(ipaddress.ip_network(self.ipv4_address._ip).version,
                         4)
        self.assertEqual(ipaddress.ip_network(self.ipv6_address._ip).version,
                         6)
test_ipaddress.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testHash(self):
        self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')),
                         hash(ipaddress.ip_interface('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')),
                         hash(ipaddress.ip_network('10.1.1.0/24')))
        self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')),
                         hash(ipaddress.ip_address('10.1.1.0')))
        # i70
        self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')),
                         hash(ipaddress.ip_address(
                    int(ipaddress.ip_address('1.2.3.4')._ip))))
        ip1 = ipaddress.ip_address('10.1.1.0')
        ip2 = ipaddress.ip_address('1::')
        dummy = {}
        dummy[self.ipv4_address] = None
        dummy[self.ipv6_address] = None
        dummy[ip1] = None
        dummy[ip2] = None
        self.assertIn(self.ipv4_address, dummy)
        self.assertIn(ip2, dummy)
routinglib.py 文件源码 项目:routing 作者: opennetworkinglab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, onosIps, num=1, numBgpSpeakers=1, asNum=65000, externalOnos=True,
                 peerIntfConfig=None, withFpm=False):
        super(SdnAutonomousSystem, self).__init__(asNum, numBgpSpeakers)
        self.onosIps = onosIps
        self.num = num
        self.numBgpSpeakers = numBgpSpeakers
        self.peerIntfConfig = peerIntfConfig
        self.withFpm = withFpm
        self.externalOnos= externalOnos
        self.internalPeeringSubnet = ip_network(u'1.1.1.0/24')

        for router in self.routers.values():
            # Add iBGP sessions to ONOS nodes
            for onosIp in onosIps:
                router.neighbors.append({'address':onosIp, 'as':asNum, 'port':2000})

            # Add iBGP sessions to other BGP speakers
            for i, router2 in self.routers.items():
                if router == router2:
                    continue
                cpIpBase = self.num*10
                ip = AutonomousSystem.getIthAddress(self.internalPeeringSubnet, cpIpBase+i)
                router.neighbors.append({'address':ip.ip, 'as':asNum})
routinglib.py 文件源码 项目:routing 作者: opennetworkinglab 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def generateRoutes(baseRange, numRoutes, subnetSize=None):
    baseNetwork = ip_network(baseRange)

    # We need to get at least 2 addresses out of each subnet, so the biggest
    # prefix length we can have is /30
    maxPrefixLength = baseNetwork.max_prefixlen - 2

    if subnetSize is not None:
        return list(baseNetwork.subnets(new_prefix=subnetSize))

    trySubnetSize = baseNetwork.prefixlen + 1
    while trySubnetSize <= maxPrefixLength and \
            len(list(baseNetwork.subnets(new_prefix=trySubnetSize))) < numRoutes:
        trySubnetSize += 1

    if trySubnetSize > maxPrefixLength:
        raise Exception("Can't get enough routes from input parameters")

    return list(baseNetwork.subnets(new_prefix=trySubnetSize))[:numRoutes]
fib.py 文件源码 项目:sonic-mgmt 作者: Azure 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, file_path):
        self._ipv4_lpm_dict = LpmDict()
        for ip in EXCLUDE_IPV4_PREFIXES:
            self._ipv4_lpm_dict[ip] = self.NextHop()

        self._ipv6_lpm_dict = LpmDict(ipv4=False)
        for ip in EXCLUDE_IPV6_PREFIXES:
            self._ipv6_lpm_dict[ip] = self.NextHop()

        # filter out empty lines and lines starting with '#'
        pattern = re.compile("^#.*$|^[ \t]*$")

        with open(file_path, 'r') as f:
            for line in f.readlines():
                if pattern.match(line): continue
                entry = line.split(' ', 1)
                prefix = ip_network(unicode(entry[0]))
                next_hop = self.NextHop(entry[1])
                if prefix.version is 4:
                    self._ipv4_lpm_dict[str(prefix)] = next_hop
                elif prefix.version is 6:
                    self._ipv6_lpm_dict[str(prefix)] = next_hop


问题


面经


文章

微信
公众号

扫码关注公众号