python类IPNetwork()的实例源码

localif.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self,
                 data   # Data suitable for this class
                 ):

        valid, message = data_is_valid(data)
        if not valid:
            raise ValueError("Invalid data: %s" % message)

        self.cidrs = []
        for iface in netifaces.interfaces():
            ifaddrs = netifaces.ifaddresses(iface)
            if netifaces.AF_INET in ifaddrs:
                for ifaddr in ifaddrs[netifaces.AF_INET]:
                    if 'addr' in ifaddr:
                        self.cidrs.append(ipaddr.IPNetwork(ifaddr['addr']))
            if netifaces.AF_INET6 in ifaddrs:
                for ifaddr in ifaddrs[netifaces.AF_INET6]:
                    if 'addr' in ifaddr:
                        #add v6 but remove stuff like %eth0 that gets thrown on end of some addrs
                        self.cidrs.append(ipaddr.IPNetwork(ifaddr['addr'].split('%')[0]))
localif.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def evaluate(self,
                 hints  # Information used for doing identification
                 ):

        """Given a set of hints, evaluate this identifier and return True if
        an identification is made.

        """

        try:
            ip = ipaddr.IPNetwork(hints['requester'])
        except KeyError:
            return False

        # TODO: Find out of there's a more hash-like way to do this
        # instead of a linear search.  This would be great if it
        # weren't GPL: https://pypi.python.org/pypi/pytricia

        for cidr in self.cidrs:
            if ip in cidr:
                return True

        return False

# A short test program
ipcymrubogon.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self,
                 data   # Data suitable for this class
                 ):

        valid, message = data_is_valid(data)
        if not valid:
            raise ValueError("Invalid data: %s" % message)

        self.exclude = [ ipaddr.IPNetwork(addr)
                         for addr in data['exclude'] ]

        try:
            timeout = pscheduler.iso8601_as_timedelta(data['timeout'])
            self.timeout = pscheduler.timedelta_as_seconds(timeout)
        except KeyError:
            self.timeout = 2

        try:
            self.fail_result = data['fail-result']
        except KeyError:
            self.fail_result = False

        self.resolver = dns.resolver.Resolver()
        self.resolver.timeout = self.timeout
        self.resolver.lifetime = self.timeout
dosinfo.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def Validate(self, value, unused_key=None):
    """Validates a subnet."""
    if value is None:
      raise validation.MissingAttribute('subnet must be specified')
    if not isinstance(value, basestring):
      raise validation.ValidationError('subnet must be a string, not \'%r\'' %
                                       type(value))
    try:
      ipaddr.IPNetwork(value)
    except ValueError:
      raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' %
                                       value)


    parts = value.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      raise validation.ValidationError('Prefix length of subnet %s must be an '
                                       'integer (quad-dotted masks are not '
                                       'supported)' % value)

    return value
dosinfo.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def Validate(self, value, unused_key=None):
    """Validates a subnet."""
    if value is None:
      raise validation.MissingAttribute('subnet must be specified')
    if not isinstance(value, basestring):
      raise validation.ValidationError('subnet must be a string, not \'%r\'' %
                                       type(value))
    try:
      ipaddr.IPNetwork(value)
    except ValueError:
      raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' %
                                       value)


    parts = value.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      raise validation.ValidationError('Prefix length of subnet %s must be an '
                                       'integer (quad-dotted masks are not '
                                       'supported)' % value)

    return value
tests.py 文件源码 项目:usres_monitor 作者: pierky 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_duplicate(net_str, net2_str=None, dup_found=True):
    usres_monitor.add_net(ipaddr.IPNetwork(net_str))
    try:
        usres_monitor.add_net(
            ipaddr.IPNetwork(net2_str if net2_str else net_str)
        )
    except USRESMonitorException as e:
        if "it was already in the db" in str(e):
            test_outcome("test_duplicate",
                         "{} and {}".format(
                             net_str,
                             net2_str if net2_str else net_str),
                         "OK" if dup_found else "FAIL")
            if not dup_found:
                raise AssertionError("Duplicate found")
            return
    test_outcome("test_duplicate",
                    "{} and {}".format(
                        net_str,
                        net2_str if net2_str else net_str),
                    "FAIL" if dup_found else "OK")
    if dup_found:
        raise AssertionError("Duplicate not found")
dosinfo.py 文件源码 项目:xxNet 作者: drzorm 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def Validate(self, value, unused_key=None):
    """Validates a subnet."""
    if value is None:
      raise validation.MissingAttribute('subnet must be specified')
    if not isinstance(value, basestring):
      raise validation.ValidationError('subnet must be a string, not \'%r\'' %
                                       type(value))
    try:
      ipaddr.IPNetwork(value)
    except ValueError:
      raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' %
                                       value)


    parts = value.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      raise validation.ValidationError('Prefix length of subnet %s must be an '
                                       'integer (quad-dotted masks are not '
                                       'supported)' % value)

    return value
test_facts.py 文件源码 项目:sonic-mgmt 作者: Azure 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def read_testbed_topo(self):
        with open(self.testbed_filename) as f:
            topo = csv.DictReader(f)
            for line in topo:
                tb_prop = {}
                name = ''
                for key in line:
                    if ('uniq-name' in key or 'conf-name' in key) and '#' in line[key]:
                        ### skip comment line
                        continue
                    elif 'uniq-name' in key or 'conf-name' in key:
                        name = line[key]
                    elif 'ptf_ip' in key and line[key]:
                        ptfaddress = ipaddress.IPNetwork(line[key])
                        tb_prop['ptf_ip'] = str(ptfaddress.ip)
                        tb_prop['ptf_netmask'] = str(ptfaddress.netmask)
                    else:
                        tb_prop[key] = line[key]
                if name:
                    self.testbed_topo[name] = tb_prop
        return
dosinfo.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def Validate(self, value, unused_key=None):
    """Validates a subnet."""
    if value is None:
      raise validation.MissingAttribute('subnet must be specified')
    if not isinstance(value, basestring):
      raise validation.ValidationError('subnet must be a string, not \'%r\'' %
                                       type(value))
    try:
      ipaddr.IPNetwork(value)
    except ValueError:
      raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' %
                                       value)


    parts = value.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      raise validation.ValidationError('Prefix length of subnet %s must be an '
                                       'integer (quad-dotted masks are not '
                                       'supported)' % value)

    return value
dosinfo.py 文件源码 项目:Docker-XX-Net 作者: kuanghy 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def Validate(self, value, unused_key=None):
    """Validates a subnet."""
    if value is None:
      raise validation.MissingAttribute('subnet must be specified')
    if not isinstance(value, basestring):
      raise validation.ValidationError('subnet must be a string, not \'%r\'' %
                                       type(value))
    try:
      ipaddr.IPNetwork(value)
    except ValueError:
      raise validation.ValidationError('%s is not a valid IPv4 or IPv6 subnet' %
                                       value)


    parts = value.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      raise validation.ValidationError('Prefix length of subnet %s must be an '
                                       'integer (quad-dotted masks are not '
                                       'supported)' % value)

    return value
http.py 文件源码 项目:Sentry 作者: NetEaseGame 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def is_valid_ip(ip_address, project):
    """
    Verify that an IP address is not being blacklisted
    for the given project.
    """
    blacklist = project.get_option('sentry:blacklisted_ips')
    if not blacklist:
        return True

    ip_network = IPNetwork(ip_address)
    for addr in blacklist:
        # We want to error fast if it's an exact match
        if ip_address == addr:
            return False

        # Check to make sure it's actually a range before
        # attempting to see if we're within that range
        if '/' in addr and ip_network in IPNetwork(addr):
            return False

    return True

# #845, add for server name filter, by hzwangzhiwei @20160803
minigraph.py 文件源码 项目:sonic-buildimage 作者: Azure 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def parse_device_desc_xml(filename):
    root = ET.parse(filename).getroot()
    (lo_prefix, mgmt_prefix, hostname, hwsku, d_type) = parse_device(root)

    results = {}
    results['DEVICE_METADATA'] = {'localhost': {
        'hostname': hostname,
        'hwsku': hwsku,
        }}

    results['LOOPBACK_INTERFACE'] = {('lo', lo_prefix): {}}

    mgmt_intf = {}
    mgmtipn = ipaddress.IPNetwork(mgmt_prefix)
    gwaddr = ipaddress.IPAddress(int(mgmtipn.network) + 1)
    results['MGMT_INTERFACE'] = {('eth0', mgmt_prefix): {'gwaddr': gwaddr}}

    return results
ipcidrlist.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self,
                 data   # Data suitable for this class
                 ):

        valid, message = data_is_valid(data)
        if not valid:
            raise ValueError("Invalid data: %s" % message)

        self.cidrs = []
        for cidr in data['cidrs']:
            self.cidrs.append(ipaddr.IPNetwork(cidr))
ipcidrlist.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def evaluate(self,
                 hints  # Information used for doing identification
                 ):

        """Given a set of hints, evaluate this identifier and return True if
        an identification is made.

        """

        try:
            ip = ipaddr.IPNetwork(hints['requester'])
        except KeyError:
            return False

        # TODO: Find out of there's a more hash-like way to do this
        # instead of a linear search.  This would be great if it
        # weren't GPL: https://pypi.python.org/pypi/pytricia

        for cidr in self.cidrs:
            if ip in cidr:
                return True

        return False


# A short test program
EXAConf.py 文件源码 项目:docker-db 作者: EXASOL 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def net_is_valid(self, net):
        """
        Returns true if the given string is a valid IP network (v4 or v6).
        """
        try:
            ipaddr.IPNetwork(net)
            return True
        except ValueError:
            return False
#}}}

#{{{ IP type
EXAConf.py 文件源码 项目:docker-db 作者: EXASOL 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_network(self, net_type):
        """ 
        Returns a network (as a string) that includes the private/public IPs of all nodes in the config.
        Raises an EXAConfError if an invalid IP is found or the IP of at least one node is not part
        of the network defined by the first node section.

        This function assumes that all nodes have an entry for the requested network type. The calling 
        function has to check if the network type is actually present (private / public).
        """

        network = "" 
        for section in self.config.sections:
            if self.is_node(section):
                node_sec = self.config[section]
                node_network = node_sec.get(net_type)
                if not node_network or node_network == "":
                    raise EXAConfError("Network type '%s' is missing in section '%s'!" % (net_type, section))
                node_ip = node_network.split("/")[0].strip()
                # check if the extracted IP is valid
                if not self.ip_is_valid(node_ip):
                    raise EXAConfError("IP %s in section '%s' is invalid!" % (node_ip, section))

                # first node : choose the private net as the cluster network (and make it a 'real' network)
                if network == "":
                    subnet = ipaddr.IPNetwork(node_network)
                    network = "%s/%s" % (str(subnet.network), str(subnet.prefixlen))
                # other nodes : check if their IP is part of the chosen net
                elif ipaddr.IPAddress(node_ip) not in ipaddr.IPNetwork(network):
                    raise EXAConfError("IP %s is not part of network %s!" % (node_ip, network))

        return network
#}}}

#{{{ Get private network
dos_xml_parser.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _ValidateEntry(self, entry):
    if not entry.subnet:
      return MISSING_SUBNET
    try:
      ipaddr.IPNetwork(entry.subnet)
    except ValueError:
      return BAD_IPV_SUBNET % entry.subnet
    parts = entry.subnet.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      return BAD_PREFIX_LENGTH % entry.subnet
dos_xml_parser.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _ValidateEntry(self, entry):
    if not entry.subnet:
      return MISSING_SUBNET
    try:
      ipaddr.IPNetwork(entry.subnet)
    except ValueError:
      return BAD_IPV_SUBNET % entry.subnet
    parts = entry.subnet.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      return BAD_PREFIX_LENGTH % entry.subnet
mech_gce.py 文件源码 项目:omni 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def is_private_network(cidr):
        return ipaddr.IPNetwork(cidr).is_private
mech_azure.py 文件源码 项目:omni 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def is_private_network(cidr):
        return ipaddr.IPNetwork(cidr).is_private
tests.py 文件源码 项目:usres_monitor 作者: pierky 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_min_max(net_str, target_prefix_len, exp_first, exp_last,
                 shoud_fail=False):
    failed = False
    try:
        net = ipaddr.IPNetwork(net_str)

#        assert net_str.lower() == "{}/{}".format(exp_first, net.prefixlen), \
#               ("String representations of original net and first expected "
#                "prefix don't match.")

        min_int, max_int, _ = usres_monitor.get_sre(net, target_prefix_len)
        first = usres_monitor.get_ip_repr(net.version, min_int)
        last = usres_monitor.get_ip_repr(net.version, max_int)

        res = "{}/{}".format(first, target_prefix_len)
        exp = "{}/{}".format(exp_first.lower(), target_prefix_len)
        assert res == exp, \
               ("First expected prefix doesn't match: {} vs {}".format(
                   res, exp)
               )

        res = "{}/{}".format(last, target_prefix_len)
        exp = "{}/{}".format(exp_last.lower(), target_prefix_len)
        assert res == exp, \
               ("Last expected prefix doesn't match: {} vs {}".format(
                   res, exp)
               )

    except AssertionError:
        failed = True
        if shoud_fail:
            pass
        else:
            raise

    assert shoud_fail == failed

    test_outcome("get_sre",
                 "{} in /{}".format(net_str, target_prefix_len),
                 "OK{}".format(" (failed as expected)" if shoud_fail else ""))
__init__.py 文件源码 项目:usres_monitor 作者: pierky 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_net(net):
        if isinstance(net, (ipaddr.IPv4Network, ipaddr.IPv6Network)):
            return net
        return ipaddr.IPNetwork(net)
cluster.py 文件源码 项目:ceph-lcm 作者: Mirantis 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def load_cluster_networks(self):
        self.cluster_net = None
        self.public_net = None

        osd = self.get_alive_osd()
        if osd is not None:
            cluster_net_str = osd.config.get('cluster_network')
            if cluster_net_str is not None and cluster_net_str != "":
                self.cluster_net = IPNetwork(cluster_net_str)

            public_net_str = osd.config.get('public_network', None)
            if public_net_str is not None and public_net_str != "":
                self.public_net = IPNetwork(public_net_str)
dos_xml_parser.py 文件源码 项目:xxNet 作者: drzorm 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _ValidateEntry(self, entry):
    if not entry.subnet:
      return MISSING_SUBNET
    try:
      ipaddr.IPNetwork(entry.subnet)
    except ValueError:
      return BAD_IPV_SUBNET % entry.subnet
    parts = entry.subnet.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      return BAD_PREFIX_LENGTH % entry.subnet
dos_xml_parser.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _ValidateEntry(self, entry):
    if not entry.subnet:
      return MISSING_SUBNET
    try:
      ipaddr.IPNetwork(entry.subnet)
    except ValueError:
      return BAD_IPV_SUBNET % entry.subnet
    parts = entry.subnet.split('/')
    if len(parts) == 2 and not re.match('^[0-9]+$', parts[1]):
      return BAD_PREFIX_LENGTH % entry.subnet
test_host.py 文件源码 项目:pycroft 作者: agdsn 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_0020_select_subnet_for_ip(self):
        subnets = dormitory.Subnet.q.order_by(dormitory.Subnet.gateway).all()
        for subnet in subnets:
            for ip in ipaddr.IPNetwork(subnet.address).iterhosts():
                selected = select_subnet_for_ip(ip.compressed, subnets)
                self.assertEqual(subnet, selected)
dormitory.py 文件源码 项目:pycroft 作者: agdsn 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def netmask(self):
        net = ipaddr.IPNetwork(self.address)
        return str(net.netmask)
dormitory.py 文件源码 项目:pycroft 作者: agdsn 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def ip_version(self):
        return ipaddr.IPNetwork(self.address).version
host.py 文件源码 项目:pycroft 作者: agdsn 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _ip_subnet_valid(self, ip, subnet):
        return ipaddr.IPAddress(ip) in ipaddr.IPNetwork(subnet.address)
sync.py 文件源码 项目:enjoliver 作者: JulienBalestra 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def cni_ipam(host_cidrv4: str, host_gateway: str):
        """
        With the class variables provide a way to generate a static host-local ipam
        :param host_cidrv4:
        :param host_gateway:
        :return: dict
        """
        host = ipaddr.IPNetwork(host_cidrv4)
        subnet = host
        ip_cut = int(host.ip.__str__().split(".")[-1])
        if ConfigSyncSchedules.sub_ips:
            sub = ipaddr.IPNetwork(host.network).ip + (ip_cut * ConfigSyncSchedules.sub_ips)
            host = ipaddr.IPNetwork(sub)
        range_start = host.ip + ConfigSyncSchedules.skip_ips
        range_end = range_start + ConfigSyncSchedules.range_nb_ips
        ipam = {
            "type": "host-local",
            "subnet": "%s/%s" % (subnet.network.__str__(), subnet.prefixlen),
            "rangeStart": range_start.__str__(),
            "rangeEnd": range_end.__str__(),
            "gateway": host_gateway,
            "routes": [
                {"dst": "%s/32" % EC.perennial_local_host_ip, "gw": ipaddr.IPNetwork(host_cidrv4).ip.__str__()},
                {"dst": "0.0.0.0/0"},
            ],
            "dataDir": "/var/lib/cni/networks"
        }
        return ipam


问题


面经


文章

微信
公众号

扫码关注公众号