python类valid_ipv6()的实例源码

zclient.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def create_connection(address):
    """
    Wrapper for socket.create_connection() function.

    If *address* (a 2-tuple ``(host, port)``) contains a valid IPv4/v6
    address, passes *address* to socket.create_connection().
    If *host* is valid path to Unix Domain socket, tries to connect to
    the server listening on the given socket.

    :param address: IP address or path to Unix Domain socket.
    :return: Socket instance.
    """
    host, _port = address

    if (netaddr.valid_ipv4(host)
            or netaddr.valid_ipv6(host)):
        return socket.create_connection(address)
    elif os.path.exists(host):
        sock = None
        try:
            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            sock.connect(host)
        except socket.error as e:
            if sock is not None:
                sock.close()
            raise e
        return sock
    else:
        raise ValueError('Invalid IP address or Unix Socket: %s' % host)
utils.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def parse_server_string(server_str):
    """Parses the given server_string and returns a tuple of host and port.
    If it's not a combination of host part and port, the port element
    is an empty string. If the input is invalid expression, return a tuple of
    two empty strings.
    """
    try:
        # First of all, exclude pure IPv6 address (w/o port).
        if netaddr.valid_ipv6(server_str):
            return (server_str, '')

        # Next, check if this is IPv6 address with a port number combination.
        if server_str.find("]:") != -1:
            (address, port) = server_str.replace('[', '', 1).split(']:')
            return (address, port)

        # Third, check if this is a combination of an address and a port
        if server_str.find(':') == -1:
            return (server_str, '')

        # This must be a combination of an address and a port
        (address, port) = server_str.split(':')
        return (address, port)

    except (ValueError, netaddr.AddrFormatError):
        LOG.error(_LE('Invalid server_string: %s'), server_str)
        return ('', '')
ipinformation.py 文件源码 项目:ipinformation 作者: neu5ron 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def general_info(self):
        """
        Return IP in bits, ip_type (ie: private, multicast, loopback,etc..), time updated/returned and version for an IP Address
        >>> from ipinformation import IPInformation
        >>> from pprint import pprint
        >>> pprint( IPInformation(ip_address='8.8.8.8').general_info() )
        {'general': {'bits': '00001000000010000000100000001000',
                     'type': 'public',
                     'updated': datetime.datetime(2016, 1, 16, 18, 7, 4, 288512),
                     'version': '4'}}
        >>> pprint( IPInformation(ip_address='127.0.0.1').general_info() )
        {'general': {'bits': '01111111000000000000000000000001',
                     'type': 'loopback',
                     'updated': datetime.datetime(2016, 1, 16, 18, 10, 6, 729149),
                     'version': '4'}}
        """
        data = { 'general': { 'bits': None, 'type': None, 'updated': None, 'version': None} }

        if not self.ISIP:
            # print '"%s" is not a valid IP Address.' %self.ip_address
            # logging_file.error( '"{0}" is not a valid IP Address.'.format(self.ip_address) )
            return data


        if netaddr.valid_ipv4( self.ip_address ): #IPv4 Address
            ip_version = '4'
            data['general'].update({'version':ip_version})
            ip_bits = netaddr.IPAddress( self.ip_address ).bits().replace( '.', '' ) #Set the IP bits for searching by subnet
            data['general'].update({'bits':ip_bits})
            ip_addr = netaddr.IPAddress(self.ip_address)

            if ip_addr.is_private():
                ip_type = 'private'
            elif ip_addr.is_multicast():
                ip_type = 'multicast'
            elif ip_addr.is_loopback():
                ip_type = 'loopback'
            elif ip_addr.is_netmask():
                ip_type = 'netmask'
            elif ip_addr.is_reserved():
                ip_type = 'reserved'
            elif ip_addr.is_link_local():
                ip_type = 'link_local'
            elif ip_addr.is_unicast():
                ip_type = 'public'
            else: #Unknown Type
                ip_type = 'unknown'
                logging_file.error( '"{0}" is an unknown IP Address.'.format(self.ip_address) )
        elif netaddr.valid_ipv6( self.ip_address ): #IPv6 Address#TODO:Finish IPv6
            ip_version = '6'
            print 'Is IPv6'
            return False

        data['general'].update( { 'type': ip_type } )
        data['general'].update( { 'updated': datetime.utcnow() } )
        return data
bgp_route.py 文件源码 项目:sonic-mgmt 作者: Azure 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def main():
    module = AnsibleModule(
            argument_spec=dict(
                neighbor=dict(required=False, default=None),
                direction=dict(required=False, choices=['adv', 'rec']),
                prefix=dict(required=False, default=None)
                ),
            supports_check_mode=False
            )
    m_args = module.params
    neighbor = m_args['neighbor']
    direction = m_args['direction']
    prefix = m_args['prefix']
    regex_ip = re.compile('[0-9a-fA-F.:]+')
    regex_iprange = re.compile('[0-9a-fA-F.:]+\/\d+')
    regex_ipv4 = re.compile('[12][0-9]{0,2}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\/?\d+?')
    if neighbor == None and direction == None and prefix == None:
        module.fail_json(msg="No support of parsing 'show ip bgp' full prefix table yet")
        return
    if neighbor and ((not netaddr.valid_ipv4(neighbor)) and (not netaddr.valid_ipv6(neighbor))):
        err_message = "Invalid neighbor address %s ??" % neighbor
        module.fail_json(msg=err_message)
        return
    if (neighbor and not direction) or (neighbor and 'adv' not in direction.lower()):
        err_message = 'No support of parsing this command " show ip(v6) bgp neighbor %s %s" yet' % (neighbor, direction)
        module.fail_json(msg=err_message)
        return
    try:
        bgproute = BgpRoutes(neighbor, direction, prefix)

        if prefix:
            if regex_ipv4.match(prefix):
                command = "docker exec -i bgp vtysh -c 'show ip bgp " + str(prefix) + "'"
            else:
                command = "docker exec -i bgp vtysh -c 'show ipv6 bgp " + str(prefix) +  "'"
            rc, out, err = module.run_command(command)
            if rc != 0:
                err_message = "command %s failed rc=%d, out=%s, err=%s" %(command, rt, out, err)
                module.fail_json(msg=err_message)
                return
            bgproute.parse_bgp_route_prefix(out)

        elif neighbor:
            if netaddr.valid_ipv4(neighbor):
                command = "docker exec -i bgp vtysh -c 'show ip bgp neighbor " + str(neighbor) + " " + str(direction) + "'"
            else:
                command = "docker exec -i bgp vtysh -c 'show ipv6 bgp neighbor " + str(neighbor) + " " + str(direction) + "'"
            rc, out, err = module.run_command(command)
            if rc !=  0:
                err_message = "command %s failed rc=%d, out=%s, err=%s" %(command, rt, out, err)
                module.fail_json(msg=err_message)
                return
            bgproute.parse_bgp_route_adv(out)

        results = bgproute.get_facts()
        module.exit_json(ansible_facts=results)
    except Exception as e:
        fail_msg = "cannot correctly parse BGP Routing facts!\n"
        fail_msg += str(e)
        module.fail_json(msg=fail_msg)
    return
zclient.py 文件源码 项目:deb-ryu 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _send_ip_route_impl(
            self, prefix, nexthops=None,
            safi=packet_safi.UNICAST, flags=zebra.ZEBRA_FLAG_INTERNAL,
            distance=None, metric=None, mtu=None, tag=None,
            is_withdraw=False):
        if ip.valid_ipv4(prefix):
            if is_withdraw:
                msg_cls = zebra.ZebraIPv4RouteDelete
            else:
                msg_cls = zebra.ZebraIPv4RouteAdd
        elif ip.valid_ipv6(prefix):
            if is_withdraw:
                msg_cls = zebra.ZebraIPv6RouteDelete
            else:
                msg_cls = zebra.ZebraIPv6RouteAdd
        else:
            raise ValueError('Invalid prefix: %s' % prefix)

        nexthop_list = []
        for nexthop in nexthops:
            if netaddr.valid_ipv4(nexthop):
                nexthop_list.append(zebra.NextHopIPv4(addr=nexthop))
            elif netaddr.valid_ipv6(nexthop):
                nexthop_list.append(zebra.NextHopIPv6(addr=nexthop))
            else:
                raise ValueError('Invalid nexthop: %s' % nexthop)

        msg = zebra.ZebraMessage(
            version=self.zserv_ver,
            body=msg_cls(
                route_type=self.route_type,
                flags=flags,
                message=0,
                safi=safi,
                prefix=prefix,
                nexthops=nexthop_list,
                distance=distance,
                metric=metric,
                mtu=mtu,
                tag=tag))
        self.send_msg(msg)

        return msg


问题


面经


文章

微信
公众号

扫码关注公众号