python类IP的实例源码

faaclib.py 文件源码 项目:FCParser 作者: josecamachop 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def equals(self, raw_value):
        """Compares this IP address to a given one, OR
        Checks this IP address matchtype.
        Suported matchtypes: 'private', 'public'.

        raw_value -- Specific IP address, OR matchtype of IP.
        """
        if self.value is None:
            output = False
        elif raw_value == 'private':
            output = (self.value.iptype() == 'PRIVATE')
        elif raw_value == 'public':
            output = (self.value.iptype() == 'PUBLIC')
        else:
            value = self.load(raw_value)
            output = (self.value == value)

        return output
tendrl_gluster_heal_info.py 文件源码 项目:node-agent 作者: Tendrl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _parse_heal_info_stats(tree):
    bricks_dict = {}
    for brick in tree.findall("healInfo/bricks/brick"):
        brick_name = brick.find("name").text
        brick_host = brick_name.split(":")[0]
        brick_path = brick_name.split(":")[1]

        # If brick host is returned as an IP conver to FQDN
        try:
            from IPy import IP
            from dns import resolver, reversename
            IP(brick_host)
            addr = reversename.from_address(brick_host)
            brick_host = str(resolver.query(addr, "PTR")[0])[:-1]
        except ValueError:
            pass

        no_of_entries = 0
        try:
            no_of_entries = int(brick.find("numberOfEntries").text)
        except ValueError:
            no_of_entries = 0
        bricks_dict["%s:%s" % (brick_host, brick_path)] = no_of_entries
    return bricks_dict
parse_target.py 文件源码 项目:pr0xy 作者: VillanCh 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def generate_target(IPs, ports):
    """Generate the target for scanning HTTP proxy"""

    gen = None
    if '-' in IPs:
        pairs = IPs.split('-')
        start = pairs[0]
        end = pairs[1]
        gen = int2ips(IP(start).int(), IP(end).int())
    else:
        gen = IP(IPs)

    for i in gen:
        for port in ports:
            if isinstance(port, int):
                port = str(port)
            yield ':'.join([i.__str__(), port])

#----------------------------------------------------------------------
parse_target.py 文件源码 项目:pr0xy 作者: VillanCh 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def runTest(self):
        IPs_1 = '123.1.2.0/24'
        IPs_2 = '123.1.2.3-123.1.3.5'
        ip_gen = generate_target(IPs_1, PORTS)
        for i in ip_gen:
            ip_port = i.split(':')
            print ip_port
            try:
                IPy.IP(ip_port[0])
            except ValueError:
                pass

        ip_gen = generate_target(IPs_2, PORTS)
        for i in ip_gen:
            ip_port = i.split(':')
            print ip_port
            try:
                IPy.IP(ip_port[0])
            except ValueError:
                pass
ip_calc_utils.py 文件源码 项目:g3ar 作者: VillanCh 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def is_ipv4(ip):
    """Check if the [ip] is a real ip addr.

    Params:
        ip: :str: General IP format.

    Returns:
        :type: bool
        :desc: if ip is a valid IP addr, return true
            else return False"""
    try:
        IP(ip)
        return True
    except ValueError:
        return False

#----------------------------------------------------------------------
config.py 文件源码 项目:fenghuangscanner_v3 作者: 0xwindows 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def getips(self,ip):
        iplist=[]
        try:
            if "-" in ip.split(".")[3]:
                startnum=int(ip.split(".")[3].split("-")[0])
                endnum=int(ip.split(".")[3].split("-")[1])
                for i in range(startnum,endnum):
                    iplist.append("%s.%s.%s.%s" %(ip.split(".")[0],ip.split(".")[1],ip.split(".")[2],i))
            else:
                ips=IP(ip)
                for i in ips:
                    iplist.append(str(i))

            return iplist

        except:
            printRed("[!] not a valid ip given. you should put ip like 192.168.1.0/24, 192.168.0.0/16,192.168.0.1-200")
            exit()
config.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def getips(self,ip):
        iplist=[]
        try:
            if "-" in ip.split(".")[3]:
                startnum=int(ip.split(".")[3].split("-")[0])
                endnum=int(ip.split(".")[3].split("-")[1])
                for i in range(startnum,endnum):
                    iplist.append("%s.%s.%s.%s" %(ip.split(".")[0],ip.split(".")[1],ip.split(".")[2],i))
            else:
                ips=IP(ip)
                for i in ips:
                    iplist.append(str(i))

            return iplist

        except:
            printRed("[!] not a valid ip given. you should put ip like 192.168.1.0/24, 192.168.0.0/16,192.168.0.1-200")
            exit()
check_wan_latency.py 文件源码 项目:check_wan_latency 作者: computingbee 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def checkstate(lat):
    if lat < 0:
        print "UNKNOWN: {0}ms, something must have gone wrong or your IP is unpingable".format(lat)
        sys.exit(STATE_UNKNOWN)
    elif TS_AVGLATW <= 0 or TS_AVGLATC <= 0:
        print "OK: {0}ms from {1}, no thresholds given or you just want see values first".format(lat,CITY)
        sys.exit(STATE_OK)
    elif TS_AVGLATW <= lat < TS_AVGLATC:
        print "WARNING: {0}ms from {1} is more than {2}ms but less than {3}ms for ip address {4}".format(lat,CITY,TS_AVGLATW,TS_AVGLATC,WAN_IP)
        sys.exit(STATE_WARNING)
    elif lat >= TS_AVGLATC:
        print "CRITICAL: {0}ms from {1} more than {2}ms for ip address {3}".format(lat,CITY,TS_AVGLATC,WAN_IP)
        sys.exit(STATE_CRITICAL)
    else:
        print "OK: {0}ms from {1}".format(lat,CITY)
        sys.exit(STATE_OK)
login_admin.py 文件源码 项目:dancedeets-monorepo 作者: mikelambert 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def authorize_middleware(app, check_env_for_admin=_no_admin):
    def wsgi_app(environ, start_response):
        # deferred.py needs REMOTE_ADDR set to a specific value,
        # so set it here if we're an internal request
        private_ip = IPy.IP(environ['REMOTE_ADDR']).iptype() == 'PRIVATE'
        if private_ip and _check_for_builtin(environ):
            environ['REMOTE_ADDR'] = '0.1.0.2'

        if _check_for_builtin(environ) or check_env_for_admin(environ):
            return app(environ, start_response)
        else:
            logging.warning('Failed to authorize request, environment is: %s', environ)
            status = '403 Forbidden'
            headers = [('Content-type', 'text/plain')]
            start_response(status, headers)
            return ['Forbidden']

    return wsgi_app
ip_geolocation.py 文件源码 项目:dancedeets-monorepo 作者: mikelambert 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_location_data_for(ip):
    if not ip:
        return {}
    # Check the common private IP spaces (used by Google for sending requests)
    if IPy.IP(ip).iptype() == 'PRIVATE':
        return {}
    start = time.time()
    data = _get_cache(ip)
    timelog.log_time_since('Getting IP Cache', start)
    if not data:
        #TODO: consider using http://geoiplookup.net/ , which might offer better granularity/resolution
        url = 'http://freegeoip.net/json/%s' % ip
        start = time.time()
        results = urllib.urlopen(url).read()
        timelog.log_time_since('Getting IPData', start)
        data = json.loads(results)
        start = time.time()
        _save_cache(ip, data)
        timelog.log_time_since('Saving IPCache', start)
    return data
protocol.py 文件源码 项目:dreamr-botnet 作者: YinAndYangSecurityAwareness 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def isPublic(self, ipaddr):
        if (Dreamr.server == True):
            return True
        else:

            # Check if host's interface address is public
            addrType = IPy.IP(ipaddr).iptype()
            if (addrType == "PUBLIC"):
                print("BECOME_SERVER")
                return True

            # Server Debug
            if (DEBUG_MODE):
                print("BECOME_SERVER_DEBUG")
                return True
        return False

# SERVER HANDLER THREAD | Fork off a client thread to handle the socket
# This is a connected client or server!
vlan.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def filter_active_router_addresses(gwportprefixes):
    """Filters a GwPortPrefix queryset, leaving only active router addresses.

    For any given prefix, if multiple router addresses exist, the lowest IP
    address will be picked.  If the prefix has a virtual address, it will be
    picked instead.

    :param gwportprefixes: A GwPortPrefix QuerySet.
    :returns: A list of GwPortPrefix objects.

    """
    # It is more or less impossible to get Django's ORM to generate the
    # wonderfully complex SQL needed for this, so we do it by hand.
    raddrs = gwportprefixes.order_by('prefix__id', '-virtual', 'gw_ip')
    grouper = groupby(raddrs, attrgetter('prefix_id'))
    return [next(group) for _key, group in grouper]
api.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def usage(self, request, *args, **kwargs):
        "Return usage for Prefix.pk == pk"
        pk = kwargs.pop("pk", None)
        prefix = Prefix.objects.get(pk=pk)
        max_addr = IP(prefix.net_address).len()
        active_addr = prefix_collector.collect_active_ip(prefix)
        # calculate allocated ratio
        query = PrefixQuerysetBuilder().within(prefix.net_address)
        allocated = query.finalize().exclude(vlan__net_type="scope")
        total_allocated = sum(p.get_prefix_size() for p in allocated)
        payload = {
            "max_addr": max_addr,
            "active_addr": active_addr,
            "usage": 1.0 * active_addr / max_addr,
            "allocated": 1.0 * total_allocated / max_addr,
            "pk": pk
        }
        return Response(payload, status=status.HTTP_200_OK)
util.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 67 收藏 0 点赞 0 评论 0
def filter_full_prefixes(self):
        """Remove /32 (or /128) prefixes from the queryset. Often useful to
        reduce noise, as these are of little or no value to most network
        planning operations.

        Returns:
            A lazy iterator of all filtered prefixes

        """
        def _filter_full_prefixes(q):
            for prefix in q:
                ip = IP(prefix.net_address)
                if ip.version() == 4 and ip.prefixlen() < 32:
                    yield prefix
                    continue
                if ip.version() == 6 and ip.prefixlen() < 128:
                    yield prefix
                    continue
        self.post_hooks.append(_filter_full_prefixes)
        return self
util.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_available_subnets(prefix_or_prefixes, used_prefixes):
    """Get available prefixes within a list of CIDR addresses, based on what
    prefixes are in use. E.g. this is `get_available_subnets`, but with
    explicit dependency injection.

    Args:
        prefix_or_prefixes: a single or a list of prefixes ("10.0.0.0/8") or
                          IPy.IP objects
        used_prefixes: prefixes that are in use

    Returns:
           An iterable IPy.IPSet of available addresses within prefix_or_prefixes

    """
    if not isinstance(prefix_or_prefixes, list):
        prefix_or_prefixes = [prefix_or_prefixes]
    base_prefixes = [str(prefix) for prefix in prefix_or_prefixes]
    acc = IPSet([IP(prefix) for prefix in prefix_or_prefixes])
    used_prefixes = IPSet([IP(used) for used in used_prefixes])
    # remove used prefixes
    acc.discard(used_prefixes)
    # filter away original prefixes
    return sorted([ip for ip in acc if str(ip) not in base_prefixes])
views.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def process_searchform(form):
    """Get searchresults based on form data"""
    extra = {}
    kwargs = {
        'last_changed__gte': datetime.now() - timedelta(
            days=form.cleaned_data['days'])
    }

    if form.cleaned_data['searchtype'] == 'ip':
        ip = IP(form.cleaned_data['searchvalue'])
        if ip.len() == 1:
            kwargs['ip'] = str(ip)
        else:
            extra['where'] = ["ip << '%s'" % str(ip)]
    else:
        key = form.cleaned_data['searchtype'] + '__icontains'
        kwargs[key] = form.cleaned_data['searchvalue']

    if form.cleaned_data['status'] != 'any':
        kwargs['status'] = form.cleaned_data['status']

    return Identity.objects.filter(**kwargs).extra(**extra)
__init__.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, host):
        if isinstance(host, Host):
            self.host = host.host
            self.ip = host.ip
            self.hostname = host.hostname
            return
        elif not host:
            raise ValueError("Host argument must be hostname or IP address")

        self.host = host
        if self.is_ip():
            self.ip = host
            self.hostname = self.get_host_by_addr() or host
        else:
            self.hostname = host
            self.ip = self.get_host_by_name() or None
views.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_queryset(self):
        """Filter for ip family"""
        if 'scope' in self.request.GET:
            queryset = (manage.Prefix.objects.within(
                self.request.GET.get('scope')).select_related('vlan')
                        .order_by('net_address'))
        elif self.request.GET.get('family'):
            queryset = manage.Prefix.objects.extra(
                where=['family(netaddr)=%s'],
                params=[self.request.GET['family']])
        else:
            queryset = manage.Prefix.objects.all()

        # Filter prefixes that is smaller than minimum prefix length
        results = [p for p in queryset
                   if IP(p.net_address).len() >= MINIMUMPREFIXLENGTH]

        return results
views.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get(request, prefix):
        """Handles get request for prefix usage"""

        try:
            ip_prefix = IP(prefix)
        except ValueError:
            return Response("Bad prefix", status=status.HTTP_400_BAD_REQUEST)

        if ip_prefix.len() < MINIMUMPREFIXLENGTH:
            return Response("Prefix is too small",
                            status=status.HTTP_400_BAD_REQUEST)

        starttime, endtime = get_times(request)
        db_prefix = manage.Prefix.objects.get(net_address=prefix)
        serializer = serializers.PrefixUsageSerializer(
            prefix_collector.fetch_usage(db_prefix, starttime, endtime))

        return Response(serializer.data)
prefix_collector.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, prefix, active_addresses, starttime=None, endtime=None):
        """

        :type prefix: manage.Prefix
        :type active_addresses: int
        :type starttime: datetime.datetime
        :type endtime: datetime.datetime
        """
        self.prefix = prefix.net_address
        self.active_addresses = active_addresses
        self.max_addresses = IP(self.prefix).len()
        self.max_hosts = self.max_addresses - 2
        self.usage = self.active_addresses / float(self.max_hosts) * 100
        self.starttime = starttime
        self.net_ident = prefix.vlan.net_ident
        self.vlan_id = prefix.vlan.vlan
        self.endtime = endtime if self.starttime else None
        self.url_machinetracker = reverse(
            'machinetracker-prefixid_search_active', args=[prefix.pk])
        self.url_report = reverse('report-prefix-prefix', args=[prefix.pk])
        self.url_vlan = reverse('vlan-details', args=[prefix.vlan.pk])
utils.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def hostname(ip):
    """
    Performs a DNS reverse lookup for an IP address and caches the result in
    a global variable, which is really, really stupid.

    :param ip: And IP address string.
    :returns: A hostname string or a False value if the lookup failed.

    """
    addr = unicode(ip)
    if addr in _cached_hostname:
        return _cached_hostname[addr]

    try:
        dns = gethostbyaddr(addr)
    except herror:
        return False

    _cached_hostname[addr] = dns[0]
    return dns[0]
utils.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_prefix_info(addr):
    """Returns the smallest prefix from the NAVdb that an IP address fits into.

    :param addr: An IP address string.
    :returns: A Prefix object or None if no prefixes matched.

    """
    try:
        return Prefix.objects.select_related().extra(
            select={"mask_size": "masklen(netaddr)"},
            where=["%s << netaddr AND nettype <> 'scope'"],
            order_by=["-mask_size"],
            params=[addr]
        )[0]
    except (IndexError, DatabaseError):
        return None
utils.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def normalize_ip_to_string(ipaddr):
    """Normalizes an IP address to a a sortable string.

    When sending IP addresses to a browser and asking JavaScript to sort them
    as strings, this function will help.

    An IPv4 address will be normalized to '4' + <15-character dotted quad>.
    An IPv6 address will be normalized to '6' + <39 character IPv6 address>

    """
    try:
        ipaddr = IP(ipaddr)
    except ValueError:
        return ipaddr

    if ipaddr.version() == 4:
        quad = str(ipaddr).split('.')
        return '4%s' % '.'.join([i.zfill(3) for i in quad])
    else:
        return '6%s' % ipaddr.strFullsize()
edit.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def resolve_ip_and_sysname(name):
    """Given a name that can be either an ip or a hostname/domain name, this
    function looks up IP and hostname.

    name - ip or hostname

    Returns:
     - tuple with ip-addres and sysname
    """
    try:
        ip_addr = IP(name)
    except ValueError:
        ip_addr = IP(gethostbyname(name))
    try:
        sysname = gethostbyaddr(unicode(ip_addr))[0]
    except SocketError:
        sysname = unicode(ip_addr)
    return (ip_addr, sysname)
edit.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def does_ip_exist(ip_addr, netbox_id=None):
    """Checks if the given IP already exist in database.

    Parameters:
     * ip_addr   - the IP addres to look for.
     * netbox_id - a netbox primary key that can have the given ip_addr, and
                   the function will still return False.

    Returns:
     - True if the IP already exists in the database (and the netbox with the
       IP is not the same as the given netbox_id).
     - False if not.
    """
    if netbox_id:
        ip_qs = Netbox.objects.filter(Q(ip=unicode(ip_addr)), ~Q(id=netbox_id))
    else:
        ip_qs = Netbox.objects.filter(ip=unicode(ip_addr))
    return ip_qs.count() > 0
views.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def group_scopes(scopes):
    """Group scopes by version and type
    :type scopes: list[Prefix]
    """
    def _prefix_as_int(prefix):
        return IP(prefix.net_address).int()

    groups = defaultdict(list)
    for scope in scopes:
        prefix = IP(scope.net_address)
        if prefix.iptype() == 'PRIVATE':
            groups['private'].append(scope)
        elif prefix.version() == 4:
            groups['ipv4'].append(scope)
        elif prefix.version() == 6:
            groups['ipv6'].append(scope)

    if any([groups['private'], groups['ipv4'], groups['ipv6']]):
        return IpGroup(*[sorted(groups[x], key=_prefix_as_int)
                         for x in ('private', 'ipv4', 'ipv6')])
    else:
        return []
arnold.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def find_input_type(ip_or_mac):
    """Try to determine whether input is a valid ip-address,
    mac-address or an swportid. Return type and reformatted input as a
    tuple"""
    # Support mac-adresses on xx:xx... format
    ip_or_mac = str(ip_or_mac)
    mac = ip_or_mac.replace(':', '')

    input_type = "UNKNOWN"
    if is_valid_ip(ip_or_mac, use_socket_lib=True):
        input_type = "IP"
    elif re.match("^[A-Fa-f0-9]{12}$", mac):
        input_type = "MAC"
    elif re.match(r"^\d+$", ip_or_mac):
        input_type = "SWPORTID"

    return input_type
arnold.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def check_non_block(ip):
    """Checks if the ip is in the nonblocklist."""
    nonblockdict = parse_nonblock_file(NONBLOCKFILE)

    # We have the result of the nonblock.cfg-file in the dict
    # nonblockdict. This dict contains 3 things:
    # 1 - Specific ip-addresses
    # 2 - Ip-ranges (129.241.xxx.xxx/xx)
    # 3 - Ip lists (129.241.xxx.xxx-xxx)

    # Specific ip-addresses
    if ip in nonblockdict['ip']:
        LOGGER.info('Computer in nonblock list, skipping')
        raise InExceptionListError

    # Ip-ranges
    for ip_range in nonblockdict['range']:
        if ip in IP(ip_range):
            raise InExceptionListError
neighbor.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, record, local_address=None):
        """Given a supported neighbor record, tries to identify the remote
        device and port among the ones registered in NAV's database.

        If a neighbor can be identified, the identified attribute is set to
        True.  The netbox and interface attributes will represent the
        identified items.

        :param record: Some namedtuple instance representing the
                       neighboring record read from the device.
        :param local_address: The management IP address used by the local
                              system. If supplied, will be used to identify
                              and ignore possible self-loops.

        """
        self.record = record
        self._invalid_neighbor_ips = list(INVALID_IPS)
        if local_address:
            self._invalid_neighbor_ips.append(str(local_address))

        self.netbox = self.interfaces = None
        self.identified = False

        self.identify()
neighbor.py 文件源码 项目:nav 作者: UNINETT 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _netbox_from_ip(self, ip):
        """Tries to find a Netbox from NAV's database based on an IP address.

        :returns: A shadows.Netbox object representing the netbox, or None if no
                  corresponding netbox was found.

        """
        try:
            ip = six.text_type(IP(ip))
        except ValueError:
            self._logger.warning("Invalid IP (%s) in neighbor record: %r",
                                 ip, self.record)
            return

        assert ip
        if ip in self._invalid_neighbor_ips:
            return
        return (self._netbox_query(Q(ip=ip)) or
                self._netbox_query(Q(interface__gwportprefix__gw_ip=ip)))


问题


面经


文章

微信
公众号

扫码关注公众号