python类valid_ipv4()的实例源码

data_utils.py 文件源码 项目:myautotest 作者: auuppp 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_ipv6_addr_by_EUI64(cidr, mac):
    """Generate a IPv6 addr by EUI-64 with CIDR and MAC

    :param str cidr: a IPv6 CIDR
    :param str mac: a MAC address
    :return: an IPv6 Address
    :rtype: netaddr.IPAddress
    """
    # Check if the prefix is IPv4 address
    is_ipv4 = netaddr.valid_ipv4(cidr)
    if is_ipv4:
        msg = "Unable to generate IP address by EUI64 for IPv4 prefix"
        raise TypeError(msg)
    try:
        eui64 = int(netaddr.EUI(mac).eui64())
        prefix = netaddr.IPNetwork(cidr)
        return netaddr.IPAddress(prefix.first + eui64 ^ (1 << 57))
    except (ValueError, netaddr.AddrFormatError):
        raise TypeError('Bad prefix or mac format for generating IPv6 '
                        'address by EUI-64: %(prefix)s, %(mac)s:'
                        % {'prefix': cidr, 'mac': mac})
    except TypeError:
        raise TypeError('Bad prefix type for generate IPv6 address by '
                        'EUI-64: %s' % cidr)
mrtlib.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def serialize(self):
        # fixup
        if (netaddr.valid_ipv4(self.peer_ip)
                and netaddr.valid_ipv4(self.local_ip)):
            self.afi = self.AFI_IPv4
        elif (netaddr.valid_ipv6(self.peer_ip)
              and netaddr.valid_ipv6(self.local_ip)):
            self.afi = self.AFI_IPv6
        else:
            raise ValueError(
                'peer_ip and local_ip must be the same address family: '
                'peer_ip=%s, local_ip=%s' % (self.peer_ip, self.local_ip))

        buf = struct.pack(self._HEADER_FMT,
                          self.peer_as, self.local_as,
                          self.if_index, self.afi)

        buf += ip.text_to_bin(self.peer_ip)
        buf += ip.text_to_bin(self.local_ip)

        buf += struct.pack(self._STATES_FMT,
                           self.old_state, self.new_state)

        return buf
mrtlib.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def serialize(self):
        # fixup
        if (netaddr.valid_ipv4(self.peer_ip)
                and netaddr.valid_ipv4(self.local_ip)):
            self.afi = self.AFI_IPv4
        elif (netaddr.valid_ipv6(self.peer_ip)
              and netaddr.valid_ipv6(self.local_ip)):
            self.afi = self.AFI_IPv6
        else:
            raise ValueError(
                'peer_ip and local_ip must be the same address family: '
                'peer_ip=%s, local_ip=%s' % (self.peer_ip, self.local_ip))

        buf = struct.pack(self._HEADER_FMT,
                          self.peer_as, self.local_as,
                          self.if_index, self.afi)

        buf += ip.text_to_bin(self.peer_ip)
        buf += ip.text_to_bin(self.local_ip)

        buf += self.bgp_message.serialize()

        return buf
bgp.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, afi, safi, next_hop, nlri,
                 flags=0, type_=None, length=None):
        super(BGPPathAttributeMpReachNLRI, self).__init__(
            flags=flags, type_=type_, length=length)
        self.afi = afi
        self.safi = safi
        if not isinstance(next_hop, (list, tuple)):
            next_hop = [next_hop]
        for n in next_hop:
            if not netaddr.valid_ipv4(n) and not netaddr.valid_ipv6(n):
                raise ValueError('Invalid address for next_hop: %s' % n)
        # Note: For the backward compatibility, stores the first next_hop
        # address and all next_hop addresses separately.
        if next_hop:
            self._next_hop = next_hop[0]
        else:
            self._next_hop = None
        self._next_hop_list = next_hop
        self.nlri = nlri
        addr_cls = _get_addr_class(afi, safi)
        for i in nlri:
            if not isinstance(i, addr_cls):
                raise ValueError('Invalid NRLI class for afi=%d and safi=%d'
                                 % (self.afi, self.safi))
zebra.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, lp_status, te_metric, max_bw, max_reserved_bw,
                 unreserved_bw, admin_group, remote_as, remote_ip,
                 average_delay, min_delay, max_delay, delay_var, pkt_loss,
                 residual_bw, average_bw, utilized_bw):
        super(InterfaceLinkParams, self).__init__()
        self.lp_status = lp_status
        self.te_metric = te_metric
        self.max_bw = max_bw
        self.max_reserved_bw = max_reserved_bw
        assert isinstance(unreserved_bw, (list, tuple))
        assert len(unreserved_bw) == MAX_CLASS_TYPE
        self.unreserved_bw = unreserved_bw
        self.admin_group = admin_group
        self.remote_as = remote_as
        assert netaddr.valid_ipv4(remote_ip)
        self.remote_ip = remote_ip
        self.average_delay = average_delay
        self.min_delay = min_delay
        self.max_delay = max_delay
        self.delay_var = delay_var
        self.pkt_loss = pkt_loss
        self.residual_bw = residual_bw
        self.average_bw = average_bw
        self.utilized_bw = utilized_bw
zebra.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def serialize(self):
        if ip.valid_ipv4(self.prefix):
            self.family = socket.AF_INET  # fixup
            prefix_addr, prefix_num = self.prefix.split('/')
            body_bin = struct.pack(
                self._IPV4_BODY_FMT,
                addrconv.ipv4.text_to_bin(prefix_addr),
                int(prefix_num))
        elif ip.valid_ipv6(self.prefix):
            self.family = socket.AF_INET6  # fixup
            prefix_addr, prefix_num = self.prefix.split('/')
            body_bin = struct.pack(
                self._IPV6_BODY_FMT,
                addrconv.ipv6.text_to_bin(prefix_addr),
                int(prefix_num))
        else:
            raise ValueError('Invalid prefix: %s' % self.prefix)

        return struct.pack(self._FAMILY_FMT, self.family) + body_bin
zebra.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def serialize(self):
        # fixup
        if ip.valid_ipv4(self.prefix):
            self.family = socket.AF_INET
        elif ip.valid_ipv6(self.prefix):
            self.family = socket.AF_INET6
        else:
            raise ValueError('Invalid prefix: %s' % self.prefix)

        buf = struct.pack(self._FAMILY_FMT, self.family)

        buf += _serialize_ip_prefix(self.prefix)

        buf += struct.pack(self._METRIC_FMT, self.metric)

        return buf + _serialize_nexthops(self.nexthops)
types.py 文件源码 项目:gluon 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def validate(value):
        if isinstance(value, six.string_types) and netaddr.valid_ipv4(value):
            return value
        raise ValueError(_("Expected IPv4 string, got '%s'") % value)
sflib.py 文件源码 项目:llk 作者: Tycx2ry 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def validIP(self, address):
        return netaddr.valid_ipv4(address)

    # Clean DNS results to be a simple list
sflib.py 文件源码 项目:llk 作者: Tycx2ry 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def matches(self, value, includeParents=False, includeChildren=True):
        value = value.lower()

        if value is None or value == "":
            return False

        if netaddr.valid_ipv4(value):
            # 1.1
            if value in self.getAddresses():
                return True
            # 1.2
            if self.targetType == "NETBLOCK_OWNER":
                if netaddr.IPAddress(value) in netaddr.IPNetwork(self.targetValue):
                    return True
            if self.targetType == "IP_ADDRESS":
                if netaddr.IPAddress(value) in \
                        netaddr.IPNetwork(netaddr.IPAddress(self.targetValue)):
                    return True
        else:
            for name in self.getNames():
                # 2.1
                if value == name:
                    return True
                # 2.2            
                if includeParents and name.endswith("." + value):
                    return True
                # 2.3
                if includeChildren and value.endswith("." + name):
                    return True

        return None


# Class for SpiderFoot Events
sflib.py 文件源码 项目:spiderfoot 作者: wi-fi-analyzer 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def validIP(self, address):
        return netaddr.valid_ipv4(address)

    # Clean DNS results to be a simple list
sflib.py 文件源码 项目:spiderfoot 作者: wi-fi-analyzer 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def matches(self, value, includeParents=False, includeChildren=True):
        value = value.lower()

        if value is None or value == "":
            return False

        if netaddr.valid_ipv4(value):
            # 1.1
            if value in self.getAddresses():
                return True
            # 1.2
            if self.targetType == "NETBLOCK_OWNER":
                if netaddr.IPAddress(value) in netaddr.IPNetwork(self.targetValue):
                    return True
            if self.targetType == "IP_ADDRESS":
                if netaddr.IPAddress(value) in \
                        netaddr.IPNetwork(netaddr.IPAddress(self.targetValue)):
                    return True
        else:
            for name in self.getNames():
                # 2.1
                if value == name:
                    return True
                # 2.2            
                if includeParents and name.endswith("." + value):
                    return True
                # 2.3
                if includeChildren and value.endswith("." + name):
                    return True

        return None


# Class for SpiderFoot Events
netutils.py 文件源码 项目:deb-oslo.utils 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def is_valid_ipv4(address):
    """Verify that address represents a valid IPv4 address.

    :param address: Value to verify
    :type address: string
    :returns: bool

    .. versionadded:: 1.1
    """
    try:
        return netaddr.valid_ipv4(address)
    except netaddr.AddrFormatError:
        return False
virtual_env.py 文件源码 项目:taf 作者: taf3 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_server_port_map(self, server, ip_addr=None):
        ports = self.handle._list_ports(device_id=server['id'], fixed_ip=ip_addr)  # pylint: disable=protected-access

        port_map = [
            (p['id'], fxip['ip_address'])
            for p in ports for fxip in p['fixed_ips']
            if netaddr.valid_ipv4(fxip['ip_address']) and fxip['ip_address'] == ip_addr['addr']
        ]

        return port_map
cidr-merge.py 文件源码 项目:cidr-tools 作者: silverwind 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def netlist(nets):
  v4nets = []
  v6nets = []
  for net in nets:
    ipNetwork = netaddr.IPNetwork(net)
    parts = str(ipNetwork).split("/")
    ip = parts[0]
    mask = parts[1]
    if netaddr.valid_ipv4(ip) and int(mask) <= 32:
      v4nets.append(ipNetwork)
    elif netaddr.valid_ipv6(ip) and int(mask) <= 128:
      v6nets.append(ipNetwork)
  return v4nets, v6nets
cidr-exclude.py 文件源码 项目:cidr-tools 作者: silverwind 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def ipset(nets):
  v4nets = netaddr.IPSet()
  v6nets = netaddr.IPSet()
  for net in nets:
    ipNetwork = netaddr.IPNetwork(net)
    parts = str(ipNetwork).split("/")
    ip = parts[0]
    mask = parts[1]
    if netaddr.valid_ipv4(ip) and int(mask) <= 32:
      v4nets.add(ipNetwork)
    elif netaddr.valid_ipv6(ip) and int(mask) <= 128:
      v6nets.add(ipNetwork)
  return v4nets, v6nets
cidr-expand.py 文件源码 项目:cidr-tools 作者: silverwind 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def netlist(nets):
  v4nets = []
  v6nets = []
  for net in nets:
    ipNetwork = netaddr.IPNetwork(net)
    parts = str(ipNetwork).split("/")
    ip = parts[0]
    mask = parts[1]
    if netaddr.valid_ipv4(ip) and int(mask) <= 32:
      v4nets.append(ipNetwork)
    elif netaddr.valid_ipv6(ip) and int(mask) <= 128:
      v6nets.append(ipNetwork)
  return v4nets, v6nets
neighbors.py 文件源码 项目:ryu-lagopus-ext 作者: lagopus 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def valid_ip_address(addr):
    if not netaddr.valid_ipv4(addr) and not netaddr.valid_ipv6(addr):
        return False
    return True
bgpspeaker.py 文件源码 项目:ryu-lagopus-ext 作者: lagopus 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def prefix_add(self, prefix, next_hop=None, route_dist=None):
        """ This method adds a new prefix to be advertized.

        ``prefix`` must be the string representation of an IP network
        (e.g., 10.1.1.0/24).

        ``next_hop`` specifies the next hop address for this
        prefix. This parameter is necessary for only VPNv4 and VPNv6
        address families.

        ``route_dist`` specifies a route distinguisher value. This
        parameter is necessary for only VPNv4 and VPNv6 address
        families.

        """
        func_name = 'network.add'
        networks = {}
        networks[PREFIX] = prefix
        if next_hop:
            networks[NEXT_HOP] = next_hop
        if route_dist:
            func_name = 'prefix.add_local'
            networks[ROUTE_DISTINGUISHER] = route_dist

            rf, p = self._check_rf_and_normalize(prefix)
            networks[ROUTE_FAMILY] = rf
            networks[PREFIX] = p

            if rf == vrfs.VRF_RF_IPV6 and netaddr.valid_ipv4(next_hop):
                # convert the next_hop to IPv4-Mapped IPv6 Address
                networks[NEXT_HOP] = \
                    str(netaddr.IPAddress(next_hop).ipv6())

        return call(func_name, **networks)
base.py 文件源码 项目:ryu-lagopus-ext 作者: lagopus 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _connect_tcp(self, peer_addr, conn_handler, time_out=None,
                     bind_address=None, password=None):
        """Creates a TCP connection to given peer address.

        Tries to create a socket for `timeout` number of seconds. If
        successful, uses the socket instance to start `client_factory`.
        The socket is bound to `bind_address` if specified.
        """
        LOG.debug('Connect TCP called for %s:%s', peer_addr[0], peer_addr[1])
        if netaddr.valid_ipv4(peer_addr[0]):
            family = socket.AF_INET
        else:
            family = socket.AF_INET6
        with Timeout(time_out, socket.error):
            sock = socket.socket(family)
            if bind_address:
                sock.bind(bind_address)
            if password:
                sockopt.set_tcp_md5sig(sock, peer_addr[0], password)
            sock.connect(peer_addr)
            # socket.error exception is rasied in cese of timeout and
            # the following code is executed only when the connection
            # is established.

        # Connection name for pro-active connection is made up of
        # local end address + remote end address
        local = self.get_localname(sock)[0]
        remote = self.get_remotename(sock)[0]
        conn_name = ('L: ' + local + ', R: ' + remote)
        self._asso_socket_map[conn_name] = sock
        # If connection is established, we call connection handler
        # in a new thread.
        self._spawn(conn_name, conn_handler, sock)
        return sock


#
# Sink
#
table_manager.py 文件源码 项目:ryu-lagopus-ext 作者: lagopus 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def add_to_global_table(self, prefix, nexthop=None,
                            is_withdraw=False):
        src_ver_num = 1
        peer = None
        # set mandatory path attributes
        origin = BGPPathAttributeOrigin(BGP_ATTR_ORIGIN_IGP)
        aspath = BGPPathAttributeAsPath([[]])

        pathattrs = OrderedDict()
        pathattrs[BGP_ATTR_TYPE_ORIGIN] = origin
        pathattrs[BGP_ATTR_TYPE_AS_PATH] = aspath

        net = netaddr.IPNetwork(prefix)
        ip = str(net.ip)
        masklen = net.prefixlen
        if netaddr.valid_ipv4(ip):
            _nlri = IPAddrPrefix(masklen, ip)
            if nexthop is None:
                nexthop = '0.0.0.0'
            p = Ipv4Path
        else:
            _nlri = IP6AddrPrefix(masklen, ip)
            if nexthop is None:
                nexthop = '::'
            p = Ipv6Path

        new_path = p(peer, _nlri, src_ver_num,
                     pattrs=pathattrs, nexthop=nexthop,
                     is_withdraw=is_withdraw)

        # add to global ipv4 table and propagates to neighbors
        self.learn_path(new_path)
core.py 文件源码 项目:ryu-lagopus-ext 作者: lagopus 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _set_password(self, address, password):
        if netaddr.valid_ipv4(address):
            family = socket.AF_INET
        else:
            family = socket.AF_INET6

        for sock in self.listen_sockets.values():
            if sock.family == family:
                sockopt.set_tcp_md5sig(sock, address, password)
utils.py 文件源码 项目:neutron-dynamic-routing 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def validate_ip_addr(ip_addr):
    if netaddr.valid_ipv4(ip_addr):
        return lib_consts.IP_VERSION_4
    elif netaddr.valid_ipv6(ip_addr):
        return lib_consts.IP_VERSION_6
    else:
        raise bgp_driver_exc.InvalidParamType(param=ip_addr,
                                              param_type='ip-address')
ipinformation.py 文件源码 项目:ipinformation 作者: neu5ron 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def is_ip(self, ip_addr=None):
        """
        Return true if valid IP address return false if invalid IP address
        :param ip_addr: optional IP to pass. Takes from root class if not specified
        >>> from ipinformation import IPInformation
        >>> print IPInformation(ip_address='8.8.8.8').is_ip()
            True
        >>> print IPInformation(ip_address='NotAnIP').is_ip()
            False
        """
        if not ip_addr:
            ip_addr = self.ip_address

        valid = True

        if netaddr.valid_ipv4( ip_addr ): #IPv4 Address
            if not re.match( valid_ip_regex, ip_addr ):
                valid = False

        elif netaddr.valid_ipv6( ip_addr ):
            pass

        else:
            # print '"%s" is not a valid IP Address.' %ip_addr
            valid = False

        return valid
common.py 文件源码 项目:felix 作者: axbaretto 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def validate_ip_addr(addr, version=None):
    """
    Validates that an IP address is valid. Returns true if valid, false if
    not. Version can be "4", "6", None for "IPv4", "IPv6", or "either"
    respectively.
    """
    if version == 4:
        return netaddr.valid_ipv4(addr)
    elif version == 6:
        return netaddr.valid_ipv6(addr)
    else:
        return netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr)
common.py 文件源码 项目:felix 作者: axbaretto 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def validate_ip_addr(addr, version=None):
    """
    Validates that an IP address is valid. Returns true if valid, false if
    not. Version can be "4", "6", None for "IPv4", "IPv6", or "either"
    respectively.
    """
    if version == 4:
        return netaddr.valid_ipv4(addr)
    elif version == 6:
        return netaddr.valid_ipv6(addr)
    else:
        return netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr)
sflib.py 文件源码 项目:spiderfoot 作者: ParrotSec 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def validIP(self, address):
        return netaddr.valid_ipv4(address)

    # Clean DNS results to be a simple list
sflib.py 文件源码 项目:spiderfoot 作者: ParrotSec 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def matches(self, value, includeParents=False, includeChildren=True):
        value = value.lower()

        if value is None or value == "":
            return False

        if netaddr.valid_ipv4(value):
            # 1.1
            if value in self.getAddresses():
                return True
            # 1.2
            if self.targetType == "NETBLOCK_OWNER":
                if netaddr.IPAddress(value) in netaddr.IPNetwork(self.targetValue):
                    return True
            if self.targetType == "IP_ADDRESS":
                if netaddr.IPAddress(value) in \
                        netaddr.IPNetwork(netaddr.IPAddress(self.targetValue)):
                    return True
        else:
            for name in self.getNames():
                # 2.1
                if value == name:
                    return True
                # 2.2            
                if includeParents and name.endswith("." + value):
                    return True
                # 2.3
                if includeChildren and value.endswith("." + name):
                    return True

        return None


# Class for SpiderFoot Events
ip.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def valid_ipv4(addr, flags=0):
    """
    Wrapper function of "netaddr.valid_ipv4()".

    The function extends "netaddr.valid_ipv4()" to enable to validate
    IPv4 network address in "xxx.xxx.xxx.xxx/xx" format.

    :param addr: IP address to be validated.
    :param flags: See the "netaddr.valid_ipv4()" docs for details.
    :return: True is valid. False otherwise.
    """
    return _valid_ip(netaddr.valid_ipv4, 32, addr, flags)
bgp.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def next_hop(self, addr):
        if not netaddr.valid_ipv4(addr) and not netaddr.valid_ipv6(addr):
            raise ValueError('Invalid address for next_hop: %s' % addr)
        self._next_hop = addr
        self.next_hop_list[0] = addr


问题


面经


文章

微信
公众号

扫码关注公众号