python类valid_ipv4()的实例源码

zebra.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _serialize_ip_prefix(prefix):
    if ip.valid_ipv4(prefix):
        prefix_addr, prefix_num = prefix.split('/')
        return bgp.IPAddrPrefix(int(prefix_num), prefix_addr).serialize()
    elif ip.valid_ipv6(prefix):
        prefix_addr, prefix_num = prefix.split('/')
        return IPv6Prefix(int(prefix_num), prefix_addr).serialize()
    else:
        raise ValueError('Invalid prefix: %s' % prefix)
zebra.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, ifindex, ifc_flags, family, prefix, dest):
        super(_ZebraInterfaceAddress, self).__init__()
        self.ifindex = ifindex
        self.ifc_flags = ifc_flags
        self.family = family
        if isinstance(prefix, (IPv4Prefix, IPv6Prefix)):
            prefix = prefix.prefix
        self.prefix = prefix
        assert netaddr.valid_ipv4(dest) or netaddr.valid_ipv6(dest)
        self.dest = dest
zebra.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def serialize(self):
        if ip.valid_ipv4(self.prefix):
            self.family = socket.AF_INET  # fixup
            prefix_addr, prefix_num = self.prefix.split('/')
            body_bin = struct.pack(
                self._IPV4_BODY_FMT,
                addrconv.ipv4.text_to_bin(prefix_addr),
                int(prefix_num),
                addrconv.ipv4.text_to_bin(self.dest))
        elif ip.valid_ipv6(self.prefix):
            self.family = socket.AF_INET6  # fixup
            prefix_addr, prefix_num = self.prefix.split('/')
            body_bin = struct.pack(
                self._IPV6_BODY_FMT,
                addrconv.ipv6.text_to_bin(prefix_addr),
                int(prefix_num),
                addrconv.ipv6.text_to_bin(self.dest))
        else:
            raise ValueError(
                'Invalid address family for prefix=%s and dest=%s'
                % (self.prefix, self.dest))

        buf = struct.pack(self._HEADER_FMT,
                          self.ifindex, self.ifc_flags, self.family)

        return buf + body_bin
zebra.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, prefix, metric=None, nexthops=None):
        super(_ZebraIPImportLookup, self).__init__()
        assert netaddr.valid_ipv4(prefix) or netaddr.valid_ipv6(prefix)
        self.prefix = prefix
        self.metric = metric
        nexthops = nexthops or []
        for nexthop in nexthops:
            assert isinstance(nexthop, _NextHop)
        self.nexthops = nexthops
zebra.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, addr, distance, metric, nexthops=None):
        super(_ZebraIPNexthopLookupMRib, self).__init__()
        assert netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr)
        self.addr = addr
        self.distance = distance
        self.metric = metric
        nexthops = nexthops or []
        for nexthop in nexthops:
            assert isinstance(nexthop, _NextHop)
        self.nexthops = nexthops
neighbors.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def valid_ip_address(addr):
    if not netaddr.valid_ipv4(addr) and not netaddr.valid_ipv6(addr):
        return False
    return True
validation.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def is_valid_ipv4(ipv4):
    """Returns True if given is a valid ipv4 address.

    Given value should be a dot-decimal notation string.

    Samples:
        - valid address: 10.0.0.1, 192.168.0.1
        - invalid address: 11.0.0, 192:168:0:1, etc.
    """
    return netaddr.valid_ipv4(ipv4)
bgp.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def create_rt_extended_community(value, subtype=2):
    """
    Creates an instance of the BGP Route Target Community (if "subtype=2")
    or Route Origin Community ("subtype=3").

    :param value: String of Route Target or Route Origin value.
    :param subtype: Subtype of Extended Community.
    :return: An instance of Route Target or Route Origin Community.
    """
    global_admin, local_admin = value.split(':')
    local_admin = int(local_admin)
    if global_admin.isdigit() and 0 <= int(global_admin) <= 0xffff:
        ext_com = BGPTwoOctetAsSpecificExtendedCommunity(
            subtype=subtype,
            as_number=int(global_admin),
            local_administrator=local_admin)
    elif global_admin.isdigit() and 0xffff < int(global_admin) <= 0xffffffff:
        ext_com = BGPFourOctetAsSpecificExtendedCommunity(
            subtype=subtype,
            as_number=int(global_admin),
            local_administrator=local_admin)
    elif netaddr.valid_ipv4(global_admin):
        ext_com = BGPIPv4AddressSpecificExtendedCommunity(
            subtype=subtype,
            ipv4_address=global_admin,
            local_administrator=local_admin)
    else:
        raise ValueError(
            'Invalid Route Target or Route Origin value: %s' % value)

    return ext_com
base.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _connect_tcp(self, peer_addr, conn_handler, time_out=None,
                     bind_address=None, password=None):
        """Creates a TCP connection to given peer address.

        Tries to create a socket for `timeout` number of seconds. If
        successful, uses the socket instance to start `client_factory`.
        The socket is bound to `bind_address` if specified.
        """
        LOG.debug('Connect TCP called for %s:%s', peer_addr[0], peer_addr[1])
        if netaddr.valid_ipv4(peer_addr[0]):
            family = socket.AF_INET
        else:
            family = socket.AF_INET6
        with Timeout(time_out, socket.error):
            sock = socket.socket(family)
            if bind_address:
                sock.bind(bind_address)
            if password:
                sockopt.set_tcp_md5sig(sock, peer_addr[0], password)
            sock.connect(peer_addr)
            # socket.error exception is raised in case of timeout and
            # the following code is executed only when the connection
            # is established.

        # Connection name for pro-active connection is made up of
        # local end address + remote end address
        local = self.get_localname(sock)[0]
        remote = self.get_remotename(sock)[0]
        conn_name = ('L: ' + local + ', R: ' + remote)
        self._asso_socket_map[conn_name] = sock
        # If connection is established, we call connection handler
        # in a new thread.
        self._spawn(conn_name, conn_handler, sock)
        return sock


#
# Sink
#
table_manager.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update_global_table(self, prefix, next_hop=None, is_withdraw=False):
        """Update a BGP route in the Global table for the given `prefix`
        with the given `next_hop`.

        If `is_withdraw` is False, which is the default, add a BGP route
        to the Global table.
        If `is_withdraw` is True, remove a BGP route from the Global table.
        """
        src_ver_num = 1
        peer = None
        # set mandatory path attributes
        origin = BGPPathAttributeOrigin(BGP_ATTR_ORIGIN_IGP)
        aspath = BGPPathAttributeAsPath([[]])

        pathattrs = OrderedDict()
        pathattrs[BGP_ATTR_TYPE_ORIGIN] = origin
        pathattrs[BGP_ATTR_TYPE_AS_PATH] = aspath

        net = netaddr.IPNetwork(prefix)
        ip = str(net.ip)
        masklen = net.prefixlen
        if netaddr.valid_ipv4(ip):
            _nlri = IPAddrPrefix(masklen, ip)
            if next_hop is None:
                next_hop = '0.0.0.0'
            p = Ipv4Path
        else:
            _nlri = IP6AddrPrefix(masklen, ip)
            if next_hop is None:
                next_hop = '::'
            p = Ipv6Path

        new_path = p(peer, _nlri, src_ver_num,
                     pattrs=pathattrs, nexthop=next_hop,
                     is_withdraw=is_withdraw)

        # add to global table and propagates to neighbors
        self.learn_path(new_path)
core.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _set_password(self, address, password):
        if netaddr.valid_ipv4(address):
            family = socket.AF_INET
        else:
            family = socket.AF_INET6

        for sock in self.listen_sockets.values():
            if sock.family == family:
                sockopt.set_tcp_md5sig(sock, address, password)
zclient.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def create_connection(address):
    """
    Wrapper for socket.create_connection() function.

    If *address* (a 2-tuple ``(host, port)``) contains a valid IPv4/v6
    address, passes *address* to socket.create_connection().
    If *host* is valid path to Unix Domain socket, tries to connect to
    the server listening on the given socket.

    :param address: IP address or path to Unix Domain socket.
    :return: Socket instance.
    """
    host, _port = address

    if (netaddr.valid_ipv4(host)
            or netaddr.valid_ipv6(host)):
        return socket.create_connection(address)
    elif os.path.exists(host):
        sock = None
        try:
            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            sock.connect(host)
        except socket.error as e:
            if sock is not None:
                sock.close()
            raise e
        return sock
    else:
        raise ValueError('Invalid IP address or Unix Socket: %s' % host)
ipinformation.py 文件源码 项目:ipinformation 作者: neu5ron 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def general_info(self):
        """
        Return IP in bits, ip_type (ie: private, multicast, loopback,etc..), time updated/returned and version for an IP Address
        >>> from ipinformation import IPInformation
        >>> from pprint import pprint
        >>> pprint( IPInformation(ip_address='8.8.8.8').general_info() )
        {'general': {'bits': '00001000000010000000100000001000',
                     'type': 'public',
                     'updated': datetime.datetime(2016, 1, 16, 18, 7, 4, 288512),
                     'version': '4'}}
        >>> pprint( IPInformation(ip_address='127.0.0.1').general_info() )
        {'general': {'bits': '01111111000000000000000000000001',
                     'type': 'loopback',
                     'updated': datetime.datetime(2016, 1, 16, 18, 10, 6, 729149),
                     'version': '4'}}
        """
        data = { 'general': { 'bits': None, 'type': None, 'updated': None, 'version': None} }

        if not self.ISIP:
            # print '"%s" is not a valid IP Address.' %self.ip_address
            # logging_file.error( '"{0}" is not a valid IP Address.'.format(self.ip_address) )
            return data


        if netaddr.valid_ipv4( self.ip_address ): #IPv4 Address
            ip_version = '4'
            data['general'].update({'version':ip_version})
            ip_bits = netaddr.IPAddress( self.ip_address ).bits().replace( '.', '' ) #Set the IP bits for searching by subnet
            data['general'].update({'bits':ip_bits})
            ip_addr = netaddr.IPAddress(self.ip_address)

            if ip_addr.is_private():
                ip_type = 'private'
            elif ip_addr.is_multicast():
                ip_type = 'multicast'
            elif ip_addr.is_loopback():
                ip_type = 'loopback'
            elif ip_addr.is_netmask():
                ip_type = 'netmask'
            elif ip_addr.is_reserved():
                ip_type = 'reserved'
            elif ip_addr.is_link_local():
                ip_type = 'link_local'
            elif ip_addr.is_unicast():
                ip_type = 'public'
            else: #Unknown Type
                ip_type = 'unknown'
                logging_file.error( '"{0}" is an unknown IP Address.'.format(self.ip_address) )
        elif netaddr.valid_ipv6( self.ip_address ): #IPv6 Address#TODO:Finish IPv6
            ip_version = '6'
            print 'Is IPv6'
            return False

        data['general'].update( { 'type': ip_type } )
        data['general'].update( { 'updated': datetime.utcnow() } )
        return data
bgp_route.py 文件源码 项目:sonic-mgmt 作者: Azure 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def main():
    module = AnsibleModule(
            argument_spec=dict(
                neighbor=dict(required=False, default=None),
                direction=dict(required=False, choices=['adv', 'rec']),
                prefix=dict(required=False, default=None)
                ),
            supports_check_mode=False
            )
    m_args = module.params
    neighbor = m_args['neighbor']
    direction = m_args['direction']
    prefix = m_args['prefix']
    regex_ip = re.compile('[0-9a-fA-F.:]+')
    regex_iprange = re.compile('[0-9a-fA-F.:]+\/\d+')
    regex_ipv4 = re.compile('[12][0-9]{0,2}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\/?\d+?')
    if neighbor == None and direction == None and prefix == None:
        module.fail_json(msg="No support of parsing 'show ip bgp' full prefix table yet")
        return
    if neighbor and ((not netaddr.valid_ipv4(neighbor)) and (not netaddr.valid_ipv6(neighbor))):
        err_message = "Invalid neighbor address %s ??" % neighbor
        module.fail_json(msg=err_message)
        return
    if (neighbor and not direction) or (neighbor and 'adv' not in direction.lower()):
        err_message = 'No support of parsing this command " show ip(v6) bgp neighbor %s %s" yet' % (neighbor, direction)
        module.fail_json(msg=err_message)
        return
    try:
        bgproute = BgpRoutes(neighbor, direction, prefix)

        if prefix:
            if regex_ipv4.match(prefix):
                command = "docker exec -i bgp vtysh -c 'show ip bgp " + str(prefix) + "'"
            else:
                command = "docker exec -i bgp vtysh -c 'show ipv6 bgp " + str(prefix) +  "'"
            rc, out, err = module.run_command(command)
            if rc != 0:
                err_message = "command %s failed rc=%d, out=%s, err=%s" %(command, rt, out, err)
                module.fail_json(msg=err_message)
                return
            bgproute.parse_bgp_route_prefix(out)

        elif neighbor:
            if netaddr.valid_ipv4(neighbor):
                command = "docker exec -i bgp vtysh -c 'show ip bgp neighbor " + str(neighbor) + " " + str(direction) + "'"
            else:
                command = "docker exec -i bgp vtysh -c 'show ipv6 bgp neighbor " + str(neighbor) + " " + str(direction) + "'"
            rc, out, err = module.run_command(command)
            if rc !=  0:
                err_message = "command %s failed rc=%d, out=%s, err=%s" %(command, rt, out, err)
                module.fail_json(msg=err_message)
                return
            bgproute.parse_bgp_route_adv(out)

        results = bgproute.get_facts()
        module.exit_json(ansible_facts=results)
    except Exception as e:
        fail_msg = "cannot correctly parse BGP Routing facts!\n"
        fail_msg += str(e)
        module.fail_json(msg=fail_msg)
    return
helpers.py 文件源码 项目:mist.api 作者: mistio 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def check_host(host, allow_localhost=config.ALLOW_CONNECT_LOCALHOST):
    """Check if a given host is a valid DNS name or IPv4 address"""

    try:
        ipaddr = socket.gethostbyname(host)
    except UnicodeEncodeError:
        raise MistError('Please provide a valid DNS name')
    except socket.gaierror:
        raise MistError("Not a valid IP address or resolvable DNS name: '%s'."
                        % host)

    if host != ipaddr:
        msg = "Host '%s' resolves to '%s' which" % (host, ipaddr)
    else:
        msg = "Host '%s'" % host

    if not netaddr.valid_ipv4(ipaddr):
        raise MistError(msg + " is not a valid IPv4 address.")

    forbidden_subnets = {
        '0.0.0.0/8': "used for broadcast messages to the current network",
        '100.64.0.0/10': ("used for communications between a service provider "
                          "and its subscribers when using a "
                          "Carrier-grade NAT"),
        '169.254.0.0/16': ("used for link-local addresses between two hosts "
                           "on a single link when no IP address is otherwise "
                           "specified"),
        '192.0.0.0/24': ("used for the IANA IPv4 Special Purpose Address "
                         "Registry"),
        '192.0.2.0/24': ("assigned as 'TEST-NET' for use solely in "
                         "documentation and example source code"),
        '192.88.99.0/24': "used by 6to4 anycast relays",
        '198.18.0.0/15': ("used for testing of inter-network communications "
                          "between two separate subnets"),
        '198.51.100.0/24': ("assigned as 'TEST-NET-2' for use solely in "
                            "documentation and example source code"),
        '203.0.113.0/24': ("assigned as 'TEST-NET-3' for use solely in "
                           "documentation and example source code"),
        '224.0.0.0/4': "reserved for multicast assignments",
        '240.0.0.0/4': "reserved for future use",
        '255.255.255.255/32': ("reserved for the 'limited broadcast' "
                               "destination address"),
    }

    if not allow_localhost:
        forbidden_subnets['127.0.0.0/8'] = ("used for loopback addresses "
                                            "to the local host")

    cidr = netaddr.smallest_matching_cidr(ipaddr, forbidden_subnets.keys())
    if cidr:
        raise MistError("%s is not allowed. It belongs to '%s' "
                        "which is %s." % (msg, cidr,
                                          forbidden_subnets[str(cidr)]))
zebra.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def __init__(self, route_type, flags, message, safi=None, prefix=None,
                 nexthops=None, ifindexes=None,
                 distance=None, metric=None, mtu=None, tag=None,
                 from_zebra=False):
        super(_ZebraIPRoute, self).__init__()
        self.route_type = route_type
        self.flags = flags
        self.message = message

        # SAFI should be included if this message sent to Zebra.
        if from_zebra:
            self.safi = None
        else:
            self.safi = safi or packet_safi.UNICAST

        assert prefix is not None
        if isinstance(prefix, (IPv4Prefix, IPv6Prefix)):
            prefix = prefix.prefix
        self.prefix = prefix

        # Nexthops should be a list of str representations of IP address
        # if this message sent from Zebra, otherwise a list of _Nexthop
        # subclasses.
        nexthops = nexthops or []
        if from_zebra:
            for nexthop in nexthops:
                assert (netaddr.valid_ipv4(nexthop)
                        or netaddr.valid_ipv6(nexthop))
        else:
            for nexthop in nexthops:
                assert isinstance(nexthop, _NextHop)
        self.nexthops = nexthops

        # Interface indexes should be included if this message sent from
        # Zebra.
        if from_zebra:
            ifindexes = ifindexes or []
            for ifindex in ifindexes:
                assert isinstance(ifindex, six.integer_types)
            self.ifindexes = ifindexes
        else:
            self.ifindexes = None

        self.distance = distance
        self.metric = metric
        self.mtu = mtu
        self.tag = tag

        # is this message sent from Zebra message or not.
        self.from_zebra = from_zebra
zclient.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _send_ip_route_impl(
            self, prefix, nexthops=None,
            safi=packet_safi.UNICAST, flags=zebra.ZEBRA_FLAG_INTERNAL,
            distance=None, metric=None, mtu=None, tag=None,
            is_withdraw=False):
        if ip.valid_ipv4(prefix):
            if is_withdraw:
                msg_cls = zebra.ZebraIPv4RouteDelete
            else:
                msg_cls = zebra.ZebraIPv4RouteAdd
        elif ip.valid_ipv6(prefix):
            if is_withdraw:
                msg_cls = zebra.ZebraIPv6RouteDelete
            else:
                msg_cls = zebra.ZebraIPv6RouteAdd
        else:
            raise ValueError('Invalid prefix: %s' % prefix)

        nexthop_list = []
        for nexthop in nexthops:
            if netaddr.valid_ipv4(nexthop):
                nexthop_list.append(zebra.NextHopIPv4(addr=nexthop))
            elif netaddr.valid_ipv6(nexthop):
                nexthop_list.append(zebra.NextHopIPv6(addr=nexthop))
            else:
                raise ValueError('Invalid nexthop: %s' % nexthop)

        msg = zebra.ZebraMessage(
            version=self.zserv_ver,
            body=msg_cls(
                route_type=self.route_type,
                flags=flags,
                message=0,
                safi=safi,
                prefix=prefix,
                nexthops=nexthop_list,
                distance=distance,
                metric=metric,
                mtu=mtu,
                tag=tag))
        self.send_msg(msg)

        return msg


问题


面经


文章

微信
公众号

扫码关注公众号