python类IPAddress()的实例源码

util.py 文件源码 项目:farfetchd 作者: isislovecruft 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def isIPAddress(ip, compressed=True):
    """Check if an arbitrary string is an IP address, and that it's valid.

    :type ip: basestring or int
    :param ip: The IP address to check.
    :param boolean compressed: If True, return a string representing the
        compressed form of the address. Otherwise, return an
        :class:`ipaddr.IPAddress` instance.
    :rtype: A :class:`ipaddr.IPAddress`, or a string, or False
    :returns: The IP, as a string or a class, if it passed the
        checks. Otherwise, returns False.
    """
    try:
        ip = ipaddr.IPAddress(ip)
    except ValueError:
        return False
    else:
        if isValidIP(ip):
            if compressed:
                return ip.compressed
            else:
                return ip
    return False
util.py 文件源码 项目:farfetchd 作者: isislovecruft 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def isIPv(version, ip):
    """Check if **ip** is a certain **version** (IPv4 or IPv6).

    .. warning: Do *not* put any calls to the logging module in this function,
        or else an infinite recursion will occur when the call is made, due
        the the log :class:`~logging.Filter`s in :mod:`~bridgedb.safelog`
        using this function to validate matches from the regular expression
        for IP addresses.

    :param integer version: The IPv[4|6] version to check; must be either
        ``4`` or ``6``. Any other value will be silently changed to ``4``.
    :param ip: The IP address to check. May be an any type which
        :class:`ipaddr.IPAddress` will accept.
    :rtype: boolean
    :returns: ``True``, if the address is an IPv4 address.
    """
    try:
        ipaddr.IPAddress(ip, version=version)
    except (ipaddr.AddressValueError, Exception):
        return False
    else:
        return True
    return False
minigraph.py 文件源码 项目:sonic-buildimage 作者: Azure 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def parse_device_desc_xml(filename):
    root = ET.parse(filename).getroot()
    (lo_prefix, mgmt_prefix, hostname, hwsku, d_type) = parse_device(root)

    results = {}
    results['DEVICE_METADATA'] = {'localhost': {
        'hostname': hostname,
        'hwsku': hwsku,
        }}

    results['LOOPBACK_INTERFACE'] = {('lo', lo_prefix): {}}

    mgmt_intf = {}
    mgmtipn = ipaddress.IPNetwork(mgmt_prefix)
    gwaddr = ipaddress.IPAddress(int(mgmtipn.network) + 1)
    results['MGMT_INTERFACE'] = {('eth0', mgmt_prefix): {'gwaddr': gwaddr}}

    return results
EXAConf.py 文件源码 项目:docker-db 作者: EXASOL 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def ip_is_valid(self, ip):
        """
        Returns true if the given string is a valid IP address (v4 or v6).
        """
        try:
            ipaddr.IPAddress(ip)
            return True
        except ValueError:
            return False
#}}}

#{{{ Check if network is valid
EXAConf.py 文件源码 项目:docker-db 作者: EXASOL 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_network(self, net_type):
        """ 
        Returns a network (as a string) that includes the private/public IPs of all nodes in the config.
        Raises an EXAConfError if an invalid IP is found or the IP of at least one node is not part
        of the network defined by the first node section.

        This function assumes that all nodes have an entry for the requested network type. The calling 
        function has to check if the network type is actually present (private / public).
        """

        network = "" 
        for section in self.config.sections:
            if self.is_node(section):
                node_sec = self.config[section]
                node_network = node_sec.get(net_type)
                if not node_network or node_network == "":
                    raise EXAConfError("Network type '%s' is missing in section '%s'!" % (net_type, section))
                node_ip = node_network.split("/")[0].strip()
                # check if the extracted IP is valid
                if not self.ip_is_valid(node_ip):
                    raise EXAConfError("IP %s in section '%s' is invalid!" % (node_ip, section))

                # first node : choose the private net as the cluster network (and make it a 'real' network)
                if network == "":
                    subnet = ipaddr.IPNetwork(node_network)
                    network = "%s/%s" % (str(subnet.network), str(subnet.prefixlen))
                # other nodes : check if their IP is part of the chosen net
                elif ipaddr.IPAddress(node_ip) not in ipaddr.IPNetwork(network):
                    raise EXAConfError("IP %s is not part of network %s!" % (node_ip, network))

        return network
#}}}

#{{{ Get private network
__init__.py 文件源码 项目:usres_monitor 作者: pierky 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_ip_repr(ip_ver, net_int):
        return ipaddr.IPAddress(net_int if ip_ver == 4 else net_int << 64)
common.py 文件源码 项目:dnsAutoRebinding 作者: Tr3jer 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def analy_req(address):
    mainDomain = conf_read('maindomain')
    address = address[:-len(mainDomain) - 1]
    payload = conf_read('payload')
    encoding = conf_read('encoding')
    record = address

    try:

        if encoding == 'int':
            record = ipaddr.IPAddress(int(address)).__str__()
        elif encoding == 'hex':
            try:
                address = address.decode('hex')

                if ipaddr.IPAddress(address).version == 4:
                    record = address
                elif conf_read('type') == 'AAAA' and ipaddr.IPAddress(address).version == 6:
                    record = address
                else:
                    pass
            except:
                pass
        # elif False not in map(lambda x:x in map(lambda x:chr(x),range(97,108)),list(address)):
        elif encoding == 'en':
            record = numToEnToNum(address)
        elif payload != 'None' and payload.find(mainDomain) == -1:
            # record = payload + "www.google.com"
            record = payload + mainDomain

    except Exception,e:
        print '[!] Subdomain Invalid {}'.format(e)
    finally:
        return record
common.py 文件源码 项目:dnsAutoRebinding 作者: Tr3jer 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def ipListBuild(address):
    print '1. Single IP Covert For En\n2. Build IP List'
    opt_req = raw_input("[+] [1 By Default/2]") or '1'
    if opt_req == '1':
        print numToEnToNum(address)
        exit()

    conf_main = conf_read('maindomain')[:-1]
    seg_len = raw_input("[+] Please Input Segment Length [24 By Default]") or 24
    encode_req = raw_input("[+] Please Input Encoding ['ipv4' By Default]")
    mainDomain = raw_input("[+] Please Input Server Root Address [{} By Default]".format(conf_main)) or conf_main
    segment = eval("ipaddr.IPv4Network('{}/{}').iterhosts()".format(address, int(seg_len)))
    save_file = "{}_{}_{}.txt".format(time.strftime("%Y%m%d%X", time.localtime()).replace(':', ''), mainDomain.replace('.','_'),(encode_req if encode_req else 'ipv4'))
    results = []

    try:

        if encode_req == '': results += ["{}.{}".format(str(i),mainDomain) for i in list(segment)]
        elif encode_req == 'en':
            results += ["{}.{}".format(numToEnToNum(str(i)),mainDomain) for i in list(segment)]
        elif encode_req == 'int':
            results += ["{}.{}".format(int(ipaddr.IPAddress(str(i))),mainDomain) for i in list(segment)]
        elif encode_req == 'hex':
            results += ["{}.{}".format(str(i).encode('hex'),mainDomain) for i in list(segment)]
        else:
            pass

        f = open(save_file,'a')
        [f.write(i+'\n') for i in results]
        f.close()
        print '[+] Stored in the {}'.format(save_file)
    except Exception,e:
        print e
        exit()
ip_address.py 文件源码 项目:deb-python-sqlalchemy-utils 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, max_length=50, *args, **kwargs):
        if not ip_address:
            raise ImproperlyConfigured(
                "'ipaddr' package is required to use 'IPAddressType' "
                "in python 2"
            )

        super(IPAddressType, self).__init__(*args, **kwargs)
        self.impl = types.Unicode(max_length)
ip_address.py 文件源码 项目:deb-python-sqlalchemy-utils 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def process_result_value(self, value, dialect):
        return ip_address(value) if value else None
ip_address.py 文件源码 项目:deb-python-sqlalchemy-utils 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _coerce(self, value):
        return ip_address(value) if value else None
host.py 文件源码 项目:pycroft 作者: agdsn 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _ip_subnet_valid(self, ip, subnet):
        return ipaddr.IPAddress(ip) in ipaddr.IPNetwork(subnet.address)
host.py 文件源码 项目:pycroft 作者: agdsn 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def select_subnet_for_ip(ip, subnets):
    for subnet in subnets:
        if ipaddr.IPAddress(ip) in ipaddr.IPv4Network(subnet.address):
            return subnet
http.py 文件源码 项目:rvmi-rekall 作者: fireeye 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, server_address, handler, *args, **kwargs):
        self.session = kwargs.pop("session")
        (address, _) = server_address
        version = ipaddr.IPAddress(address).version
        if version == 4:
            self.address_family = socket.AF_INET
        elif version == 6:
            self.address_family = socket.AF_INET6

        BaseHTTPServer.HTTPServer.__init__(
            self, server_address, handler, *args, **kwargs)
tls_handshake.py 文件源码 项目:ooniprobe-debian 作者: TheTorProject 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def buildSocket(self, addr):
        global s
        ip = ipaddr.IPAddress(addr) ## learn if we're IPv4 or IPv6
        if ip.version == 4:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        elif ip.version == 6:
            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
        return s
whatsapp.py 文件源码 项目:ooniprobe-debian 作者: TheTorProject 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def contains(self, ip_address):
        ip = ipaddr.IPAddress(ip_address)
        if isinstance(ip, ipaddr.IPv4Address):
            networks = self.ipv4_networks
        elif isinstance(ip, ipaddr.IPv6Address):
            networks = self.ipv6_networks
        else:
            raise RuntimeError("Should never happen")
        for network in networks:
            if network.Contains(ip):
                return True
        return False
net.py 文件源码 项目:ooniprobe-debian 作者: TheTorProject 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def getAddresses():
    from scapy.all import get_if_addr, get_if_list
    from ipaddr import IPAddress

    addresses = set()
    for i in get_if_list():
        try:
            addresses.add(get_if_addr(i))
        except:
            pass
    if '0.0.0.0' in addresses:
        addresses.remove('0.0.0.0')
    return [IPAddress(addr) for addr in addresses]
ipaddresses.py 文件源码 项目:arouteserver 作者: pierky 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, ip):
        if ip_library == "ipaddr":
            self.obj = ipaddr.IPAddress(ip)
            self.ip = str(self.obj)
        else:
            self.obj = ipaddress.ip_address(ip)
            self.ip = str(self.obj.compressed)

        self.version = self.obj.version
vlan.py 文件源码 项目:noc 作者: onfsdn 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, vid, conf=None):
        if conf is None:
            conf = {}
        self.vid = vid
        self.tagged = []
        self.untagged = []
        self.name = conf.setdefault('name', str(vid))
        self.description = conf.setdefault('description', self.name)
        self.controller_ips = conf.setdefault('controller_ips', [])
        if self.controller_ips:
            self.controller_ips = [
                ipaddr.IPNetwork(ip) for ip in self.controller_ips]
        self.unicast_flood = conf.setdefault('unicast_flood', True)
        self.routes = conf.setdefault('routes', {})
        self.ipv4_routes = {}
        self.ipv6_routes = {}
        if self.routes:
            self.routes = [route['route'] for route in self.routes]
            for route in self.routes:
                ip_gw = ipaddr.IPAddress(route['ip_gw'])
                ip_dst = ipaddr.IPNetwork(route['ip_dst'])
                assert(ip_gw.version == ip_dst.version)
                if ip_gw.version == 4:
                    self.ipv4_routes[ip_dst] = ip_gw
                else:
                    self.ipv6_routes[ip_dst] = ip_gw
        self.arp_cache = {}
        self.nd_cache = {}
        self.max_hosts = conf.setdefault('max_hosts', None)
        self.host_cache = {}
ipcymrubogon.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def evaluate(self,
                 hints  # Information used for doing identification
                 ):

        """Given a set of hints, evaluate this identifier and return True if
        an identification is made.

        """

        try:
            ip = hints['requester']
        except KeyError:
            return False

        octets = dns.reversename.from_address(ip)[0:-3]
        host = '.'.join(octets)
        host += '.v4.fullbogons.cymru.com' if len(octets) == 4 \
                else '.v6.fullbogons.cymru.com'

        # The query for this is always for an 'A' record, even for
        # IPv6 hosts.

        try:
            resolved = self.resolver.query(host, 'A')[0]
        except dns.resolver.NXDOMAIN:
            return False
        except (dns.exception.Timeout,
                dns.resolver.NoAnswer,
                dns.resolver.NoNameservers):
            return self.fail_result

        # The query will return 127.0.0.2 if it's in the bogon list.
        # See http://www.team-cymru.org/bogon-reference-dns.html.

        if str(resolved) != '127.0.0.2':
            return False

        # At this point, we have a bogon.  Filter out exclusions.

        net_ip = ipaddr.IPAddress(ip)
        for exclude in self.exclude:
            if net_ip in exclude:
                return False

        # Not excluded; must be a legit bogon.

        return True


# A short test program
ipreversedns.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def evaluate(self,
                 hints  # Information used for doing identification
                 ):

        """Given a set of hints, evaluate this identifier and return True if
        an identification is made.

        """

        try:
            ip = hints['requester']
        except KeyError:
            return False

        addr = ipaddr.IPAddress(ip)

        ip_reverse = dns.reversename.from_address(ip)

        # Resolve to a FQDN

        try:
            reverse = str(self.resolver.query(ip_reverse, 'PTR')[0])
        except (dns.resolver.NXDOMAIN,
                dns.exception.Timeout,
                dns.resolver.NoAnswer,
                dns.resolver.NoNameservers):
            return False

        # Resolve the FQDN back to an IP and see if they match.  This
        # prevents someone in control over their reverse resolution
        # from claiming they're someone they're not.

        # TODO: Check against _all_ returned IPs

        record = 'A' if addr.version == 4 else 'AAAA'
        try:
            forwards = self.resolver.query(reverse, record)
        except (dns.resolver.NXDOMAIN,
                dns.exception.Timeout,
                dns.resolver.NoAnswer,
                dns.resolver.NoNameservers):
            return False

        if ip not in [ str(f) for f in forwards ]:
            return False

        # Try to match with and without the dot at the end.

        for reverse_candidate in [ reverse, reverse.rstrip('.') ]:
            if self.matcher.matches(reverse_candidate):
                return True


        # No match, no dice.
        return False



# A short test program
scanapi.py 文件源码 项目:scanapi 作者: mozilla 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _pass_hostinfo(self, entry):
        s = None
        if entry['host'] not in self._state:
            s = {
                    'vulnerabilities':        [],
                    'exempt_vulnerabilities': [],
                    'ports':                  set(),
                    'hostname':               None,
                    'ipaddress':              None,
                    'os':                     None,
                    'credentialed_checks':    False
                    }
        else:
            s = self._state[entry['host']]

        # if the hostname has not been set yet, just default it to the key/target
        # value
        if s['hostname'] == None:
            s['hostname'] = entry['host']

        thishostinfo = self._hostinfo_locator(entry)

        # attempt to determine the ip address; if our target is an ip just use that,
        # otherwise try to locate the ip address using the supplementary host info
        try:
            ipaddr.IPAddress(entry['host'])
            s['ipaddress'] = entry['host']
        except:
            if thishostinfo != None:
                s['ipaddress'] = thishostinfo['host-ip']

        if thishostinfo != None and 'operating-system' in thishostinfo:
            s['os'] = thishostinfo['operating-system']

        # attempt to extract kernel hostname 
        if 'output of \"uname -a\" is' in entry['output']:
            unamestr = entry['output'].replace('\n', ' ')
            m = re.search('output of "uname -a" is : Linux (\S+) ', unamestr)
            if m != None:
                s['hostname'] = m.group(1)
        elif '= Computer name' in entry['output']:
            cnamestr = entry['output'].replace('\n', ' ')
            m = re.search('(\S+)\s+= Computer name', cnamestr)
            if m != None:
                s['hostname'] = m.group(1)

        # flip credentialed checks if we find plugin output indicating the scan
        # included successfully used credentials
        if 'Credentialed checks : yes' in entry['output']:
            s['credentialed_checks'] = True

        self._state[entry['host']] = s
cluster.py 文件源码 项目:ceph-lcm 作者: Mirantis 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def load_hosts(self):
        for host_name in self.storage.hosts[2]:
            stor_node = self.storage.get("hosts/" + host_name, expected_format=None)

            host = Host(host_name)
            self.hosts[host.name] = host

            lshw_xml = stor_node.get('lshw', expected_format='xml')

            if lshw_xml is None:
                host.hw_info = None
            else:
                try:
                    host.hw_info = get_hw_info(lshw_xml)
                except:
                    host.hw_info = None

            info = self.parse_meminfo(stor_node.get('meminfo'))
            host.mem_total = info['MemTotal']
            host.mem_free = info['MemFree']
            host.swap_total = info['SwapTotal']
            host.swap_free = info['SwapFree']
            loadavg = stor_node.get('loadavg')

            host.load_5m = None if loadavg is None else float(loadavg.strip().split()[1])

            ipa = self.storage.get('hosts/%s/ipa' % host.name)
            ip_rr_s = r"\d+:\s+(?P<adapter>.*?)\s+inet\s+(?P<ip>\d+\.\d+\.\d+\.\d+)/(?P<size>\d+)"

            info = collections.defaultdict(lambda: [])
            for line in ipa.split("\n"):
                match = re.match(ip_rr_s, line)
                if match is not None:
                    info[match.group('adapter')].append(
                        (IPAddress(match.group('ip')), int(match.group('size'))))

            for adapter, ips_with_sizes in info.items():
                for ip, sz in ips_with_sizes:
                    if self.public_net is not None and ip in self.public_net:
                        host.public_net = NetworkAdapter(adapter, ip)

                    if self.cluster_net is not None and ip in self.cluster_net:
                        host.cluster_net = NetworkAdapter(adapter, ip)

            interfaces = getattr(self.jstorage.hosts, host_name).interfaces
            for name, adapter_dct in interfaces.items():
                adapter_dct = adapter_dct.copy()

                dev = adapter_dct.pop('dev')
                adapter = NetworkAdapter(dev, None)
                adapter.__dict__.update(adapter_dct)
                host.net_adapters[dev] = adapter

            net_stats = self.get_node_net_stats(host.name)
            perf_adapters = [host.cluster_net, host.public_net] + list(host.net_adapters.values())

            for net in perf_adapters:
                if net is not None and net.name is not None:
                    net.perf_stats = net_stats.get(net.name)

            host.uptime = float(stor_node.get('uptime').split()[0])


问题


面经


文章

微信
公众号

扫码关注公众号