def equals(self, raw_value):
"""Compares this IP address to a given one, OR
Checks this IP address matchtype.
Suported matchtypes: 'private', 'public'.
raw_value -- Specific IP address, OR matchtype of IP.
"""
if self.value is None:
output = False
elif raw_value == 'private':
output = (self.value.iptype() == 'PRIVATE')
elif raw_value == 'public':
output = (self.value.iptype() == 'PUBLIC')
else:
value = self.load(raw_value)
output = (self.value == value)
return output
python类IP的实例源码
def _parse_heal_info_stats(tree):
bricks_dict = {}
for brick in tree.findall("healInfo/bricks/brick"):
brick_name = brick.find("name").text
brick_host = brick_name.split(":")[0]
brick_path = brick_name.split(":")[1]
# If brick host is returned as an IP conver to FQDN
try:
from IPy import IP
from dns import resolver, reversename
IP(brick_host)
addr = reversename.from_address(brick_host)
brick_host = str(resolver.query(addr, "PTR")[0])[:-1]
except ValueError:
pass
no_of_entries = 0
try:
no_of_entries = int(brick.find("numberOfEntries").text)
except ValueError:
no_of_entries = 0
bricks_dict["%s:%s" % (brick_host, brick_path)] = no_of_entries
return bricks_dict
def generate_target(IPs, ports):
"""Generate the target for scanning HTTP proxy"""
gen = None
if '-' in IPs:
pairs = IPs.split('-')
start = pairs[0]
end = pairs[1]
gen = int2ips(IP(start).int(), IP(end).int())
else:
gen = IP(IPs)
for i in gen:
for port in ports:
if isinstance(port, int):
port = str(port)
yield ':'.join([i.__str__(), port])
#----------------------------------------------------------------------
def runTest(self):
IPs_1 = '123.1.2.0/24'
IPs_2 = '123.1.2.3-123.1.3.5'
ip_gen = generate_target(IPs_1, PORTS)
for i in ip_gen:
ip_port = i.split(':')
print ip_port
try:
IPy.IP(ip_port[0])
except ValueError:
pass
ip_gen = generate_target(IPs_2, PORTS)
for i in ip_gen:
ip_port = i.split(':')
print ip_port
try:
IPy.IP(ip_port[0])
except ValueError:
pass
def is_ipv4(ip):
"""Check if the [ip] is a real ip addr.
Params:
ip: :str: General IP format.
Returns:
:type: bool
:desc: if ip is a valid IP addr, return true
else return False"""
try:
IP(ip)
return True
except ValueError:
return False
#----------------------------------------------------------------------
def getips(self,ip):
iplist=[]
try:
if "-" in ip.split(".")[3]:
startnum=int(ip.split(".")[3].split("-")[0])
endnum=int(ip.split(".")[3].split("-")[1])
for i in range(startnum,endnum):
iplist.append("%s.%s.%s.%s" %(ip.split(".")[0],ip.split(".")[1],ip.split(".")[2],i))
else:
ips=IP(ip)
for i in ips:
iplist.append(str(i))
return iplist
except:
printRed("[!] not a valid ip given. you should put ip like 192.168.1.0/24, 192.168.0.0/16,192.168.0.1-200")
exit()
def getips(self,ip):
iplist=[]
try:
if "-" in ip.split(".")[3]:
startnum=int(ip.split(".")[3].split("-")[0])
endnum=int(ip.split(".")[3].split("-")[1])
for i in range(startnum,endnum):
iplist.append("%s.%s.%s.%s" %(ip.split(".")[0],ip.split(".")[1],ip.split(".")[2],i))
else:
ips=IP(ip)
for i in ips:
iplist.append(str(i))
return iplist
except:
printRed("[!] not a valid ip given. you should put ip like 192.168.1.0/24, 192.168.0.0/16,192.168.0.1-200")
exit()
def checkstate(lat):
if lat < 0:
print "UNKNOWN: {0}ms, something must have gone wrong or your IP is unpingable".format(lat)
sys.exit(STATE_UNKNOWN)
elif TS_AVGLATW <= 0 or TS_AVGLATC <= 0:
print "OK: {0}ms from {1}, no thresholds given or you just want see values first".format(lat,CITY)
sys.exit(STATE_OK)
elif TS_AVGLATW <= lat < TS_AVGLATC:
print "WARNING: {0}ms from {1} is more than {2}ms but less than {3}ms for ip address {4}".format(lat,CITY,TS_AVGLATW,TS_AVGLATC,WAN_IP)
sys.exit(STATE_WARNING)
elif lat >= TS_AVGLATC:
print "CRITICAL: {0}ms from {1} more than {2}ms for ip address {3}".format(lat,CITY,TS_AVGLATC,WAN_IP)
sys.exit(STATE_CRITICAL)
else:
print "OK: {0}ms from {1}".format(lat,CITY)
sys.exit(STATE_OK)
def authorize_middleware(app, check_env_for_admin=_no_admin):
def wsgi_app(environ, start_response):
# deferred.py needs REMOTE_ADDR set to a specific value,
# so set it here if we're an internal request
private_ip = IPy.IP(environ['REMOTE_ADDR']).iptype() == 'PRIVATE'
if private_ip and _check_for_builtin(environ):
environ['REMOTE_ADDR'] = '0.1.0.2'
if _check_for_builtin(environ) or check_env_for_admin(environ):
return app(environ, start_response)
else:
logging.warning('Failed to authorize request, environment is: %s', environ)
status = '403 Forbidden'
headers = [('Content-type', 'text/plain')]
start_response(status, headers)
return ['Forbidden']
return wsgi_app
def get_location_data_for(ip):
if not ip:
return {}
# Check the common private IP spaces (used by Google for sending requests)
if IPy.IP(ip).iptype() == 'PRIVATE':
return {}
start = time.time()
data = _get_cache(ip)
timelog.log_time_since('Getting IP Cache', start)
if not data:
#TODO: consider using http://geoiplookup.net/ , which might offer better granularity/resolution
url = 'http://freegeoip.net/json/%s' % ip
start = time.time()
results = urllib.urlopen(url).read()
timelog.log_time_since('Getting IPData', start)
data = json.loads(results)
start = time.time()
_save_cache(ip, data)
timelog.log_time_since('Saving IPCache', start)
return data
def isPublic(self, ipaddr):
if (Dreamr.server == True):
return True
else:
# Check if host's interface address is public
addrType = IPy.IP(ipaddr).iptype()
if (addrType == "PUBLIC"):
print("BECOME_SERVER")
return True
# Server Debug
if (DEBUG_MODE):
print("BECOME_SERVER_DEBUG")
return True
return False
# SERVER HANDLER THREAD | Fork off a client thread to handle the socket
# This is a connected client or server!
def filter_active_router_addresses(gwportprefixes):
"""Filters a GwPortPrefix queryset, leaving only active router addresses.
For any given prefix, if multiple router addresses exist, the lowest IP
address will be picked. If the prefix has a virtual address, it will be
picked instead.
:param gwportprefixes: A GwPortPrefix QuerySet.
:returns: A list of GwPortPrefix objects.
"""
# It is more or less impossible to get Django's ORM to generate the
# wonderfully complex SQL needed for this, so we do it by hand.
raddrs = gwportprefixes.order_by('prefix__id', '-virtual', 'gw_ip')
grouper = groupby(raddrs, attrgetter('prefix_id'))
return [next(group) for _key, group in grouper]
def usage(self, request, *args, **kwargs):
"Return usage for Prefix.pk == pk"
pk = kwargs.pop("pk", None)
prefix = Prefix.objects.get(pk=pk)
max_addr = IP(prefix.net_address).len()
active_addr = prefix_collector.collect_active_ip(prefix)
# calculate allocated ratio
query = PrefixQuerysetBuilder().within(prefix.net_address)
allocated = query.finalize().exclude(vlan__net_type="scope")
total_allocated = sum(p.get_prefix_size() for p in allocated)
payload = {
"max_addr": max_addr,
"active_addr": active_addr,
"usage": 1.0 * active_addr / max_addr,
"allocated": 1.0 * total_allocated / max_addr,
"pk": pk
}
return Response(payload, status=status.HTTP_200_OK)
def filter_full_prefixes(self):
"""Remove /32 (or /128) prefixes from the queryset. Often useful to
reduce noise, as these are of little or no value to most network
planning operations.
Returns:
A lazy iterator of all filtered prefixes
"""
def _filter_full_prefixes(q):
for prefix in q:
ip = IP(prefix.net_address)
if ip.version() == 4 and ip.prefixlen() < 32:
yield prefix
continue
if ip.version() == 6 and ip.prefixlen() < 128:
yield prefix
continue
self.post_hooks.append(_filter_full_prefixes)
return self
def _get_available_subnets(prefix_or_prefixes, used_prefixes):
"""Get available prefixes within a list of CIDR addresses, based on what
prefixes are in use. E.g. this is `get_available_subnets`, but with
explicit dependency injection.
Args:
prefix_or_prefixes: a single or a list of prefixes ("10.0.0.0/8") or
IPy.IP objects
used_prefixes: prefixes that are in use
Returns:
An iterable IPy.IPSet of available addresses within prefix_or_prefixes
"""
if not isinstance(prefix_or_prefixes, list):
prefix_or_prefixes = [prefix_or_prefixes]
base_prefixes = [str(prefix) for prefix in prefix_or_prefixes]
acc = IPSet([IP(prefix) for prefix in prefix_or_prefixes])
used_prefixes = IPSet([IP(used) for used in used_prefixes])
# remove used prefixes
acc.discard(used_prefixes)
# filter away original prefixes
return sorted([ip for ip in acc if str(ip) not in base_prefixes])
def process_searchform(form):
"""Get searchresults based on form data"""
extra = {}
kwargs = {
'last_changed__gte': datetime.now() - timedelta(
days=form.cleaned_data['days'])
}
if form.cleaned_data['searchtype'] == 'ip':
ip = IP(form.cleaned_data['searchvalue'])
if ip.len() == 1:
kwargs['ip'] = str(ip)
else:
extra['where'] = ["ip << '%s'" % str(ip)]
else:
key = form.cleaned_data['searchtype'] + '__icontains'
kwargs[key] = form.cleaned_data['searchvalue']
if form.cleaned_data['status'] != 'any':
kwargs['status'] = form.cleaned_data['status']
return Identity.objects.filter(**kwargs).extra(**extra)
def __init__(self, host):
if isinstance(host, Host):
self.host = host.host
self.ip = host.ip
self.hostname = host.hostname
return
elif not host:
raise ValueError("Host argument must be hostname or IP address")
self.host = host
if self.is_ip():
self.ip = host
self.hostname = self.get_host_by_addr() or host
else:
self.hostname = host
self.ip = self.get_host_by_name() or None
def get_queryset(self):
"""Filter for ip family"""
if 'scope' in self.request.GET:
queryset = (manage.Prefix.objects.within(
self.request.GET.get('scope')).select_related('vlan')
.order_by('net_address'))
elif self.request.GET.get('family'):
queryset = manage.Prefix.objects.extra(
where=['family(netaddr)=%s'],
params=[self.request.GET['family']])
else:
queryset = manage.Prefix.objects.all()
# Filter prefixes that is smaller than minimum prefix length
results = [p for p in queryset
if IP(p.net_address).len() >= MINIMUMPREFIXLENGTH]
return results
def get(request, prefix):
"""Handles get request for prefix usage"""
try:
ip_prefix = IP(prefix)
except ValueError:
return Response("Bad prefix", status=status.HTTP_400_BAD_REQUEST)
if ip_prefix.len() < MINIMUMPREFIXLENGTH:
return Response("Prefix is too small",
status=status.HTTP_400_BAD_REQUEST)
starttime, endtime = get_times(request)
db_prefix = manage.Prefix.objects.get(net_address=prefix)
serializer = serializers.PrefixUsageSerializer(
prefix_collector.fetch_usage(db_prefix, starttime, endtime))
return Response(serializer.data)
def __init__(self, prefix, active_addresses, starttime=None, endtime=None):
"""
:type prefix: manage.Prefix
:type active_addresses: int
:type starttime: datetime.datetime
:type endtime: datetime.datetime
"""
self.prefix = prefix.net_address
self.active_addresses = active_addresses
self.max_addresses = IP(self.prefix).len()
self.max_hosts = self.max_addresses - 2
self.usage = self.active_addresses / float(self.max_hosts) * 100
self.starttime = starttime
self.net_ident = prefix.vlan.net_ident
self.vlan_id = prefix.vlan.vlan
self.endtime = endtime if self.starttime else None
self.url_machinetracker = reverse(
'machinetracker-prefixid_search_active', args=[prefix.pk])
self.url_report = reverse('report-prefix-prefix', args=[prefix.pk])
self.url_vlan = reverse('vlan-details', args=[prefix.vlan.pk])
def hostname(ip):
"""
Performs a DNS reverse lookup for an IP address and caches the result in
a global variable, which is really, really stupid.
:param ip: And IP address string.
:returns: A hostname string or a False value if the lookup failed.
"""
addr = unicode(ip)
if addr in _cached_hostname:
return _cached_hostname[addr]
try:
dns = gethostbyaddr(addr)
except herror:
return False
_cached_hostname[addr] = dns[0]
return dns[0]
def get_prefix_info(addr):
"""Returns the smallest prefix from the NAVdb that an IP address fits into.
:param addr: An IP address string.
:returns: A Prefix object or None if no prefixes matched.
"""
try:
return Prefix.objects.select_related().extra(
select={"mask_size": "masklen(netaddr)"},
where=["%s << netaddr AND nettype <> 'scope'"],
order_by=["-mask_size"],
params=[addr]
)[0]
except (IndexError, DatabaseError):
return None
def normalize_ip_to_string(ipaddr):
"""Normalizes an IP address to a a sortable string.
When sending IP addresses to a browser and asking JavaScript to sort them
as strings, this function will help.
An IPv4 address will be normalized to '4' + <15-character dotted quad>.
An IPv6 address will be normalized to '6' + <39 character IPv6 address>
"""
try:
ipaddr = IP(ipaddr)
except ValueError:
return ipaddr
if ipaddr.version() == 4:
quad = str(ipaddr).split('.')
return '4%s' % '.'.join([i.zfill(3) for i in quad])
else:
return '6%s' % ipaddr.strFullsize()
def resolve_ip_and_sysname(name):
"""Given a name that can be either an ip or a hostname/domain name, this
function looks up IP and hostname.
name - ip or hostname
Returns:
- tuple with ip-addres and sysname
"""
try:
ip_addr = IP(name)
except ValueError:
ip_addr = IP(gethostbyname(name))
try:
sysname = gethostbyaddr(unicode(ip_addr))[0]
except SocketError:
sysname = unicode(ip_addr)
return (ip_addr, sysname)
def does_ip_exist(ip_addr, netbox_id=None):
"""Checks if the given IP already exist in database.
Parameters:
* ip_addr - the IP addres to look for.
* netbox_id - a netbox primary key that can have the given ip_addr, and
the function will still return False.
Returns:
- True if the IP already exists in the database (and the netbox with the
IP is not the same as the given netbox_id).
- False if not.
"""
if netbox_id:
ip_qs = Netbox.objects.filter(Q(ip=unicode(ip_addr)), ~Q(id=netbox_id))
else:
ip_qs = Netbox.objects.filter(ip=unicode(ip_addr))
return ip_qs.count() > 0
def group_scopes(scopes):
"""Group scopes by version and type
:type scopes: list[Prefix]
"""
def _prefix_as_int(prefix):
return IP(prefix.net_address).int()
groups = defaultdict(list)
for scope in scopes:
prefix = IP(scope.net_address)
if prefix.iptype() == 'PRIVATE':
groups['private'].append(scope)
elif prefix.version() == 4:
groups['ipv4'].append(scope)
elif prefix.version() == 6:
groups['ipv6'].append(scope)
if any([groups['private'], groups['ipv4'], groups['ipv6']]):
return IpGroup(*[sorted(groups[x], key=_prefix_as_int)
for x in ('private', 'ipv4', 'ipv6')])
else:
return []
def find_input_type(ip_or_mac):
"""Try to determine whether input is a valid ip-address,
mac-address or an swportid. Return type and reformatted input as a
tuple"""
# Support mac-adresses on xx:xx... format
ip_or_mac = str(ip_or_mac)
mac = ip_or_mac.replace(':', '')
input_type = "UNKNOWN"
if is_valid_ip(ip_or_mac, use_socket_lib=True):
input_type = "IP"
elif re.match("^[A-Fa-f0-9]{12}$", mac):
input_type = "MAC"
elif re.match(r"^\d+$", ip_or_mac):
input_type = "SWPORTID"
return input_type
def check_non_block(ip):
"""Checks if the ip is in the nonblocklist."""
nonblockdict = parse_nonblock_file(NONBLOCKFILE)
# We have the result of the nonblock.cfg-file in the dict
# nonblockdict. This dict contains 3 things:
# 1 - Specific ip-addresses
# 2 - Ip-ranges (129.241.xxx.xxx/xx)
# 3 - Ip lists (129.241.xxx.xxx-xxx)
# Specific ip-addresses
if ip in nonblockdict['ip']:
LOGGER.info('Computer in nonblock list, skipping')
raise InExceptionListError
# Ip-ranges
for ip_range in nonblockdict['range']:
if ip in IP(ip_range):
raise InExceptionListError
def __init__(self, record, local_address=None):
"""Given a supported neighbor record, tries to identify the remote
device and port among the ones registered in NAV's database.
If a neighbor can be identified, the identified attribute is set to
True. The netbox and interface attributes will represent the
identified items.
:param record: Some namedtuple instance representing the
neighboring record read from the device.
:param local_address: The management IP address used by the local
system. If supplied, will be used to identify
and ignore possible self-loops.
"""
self.record = record
self._invalid_neighbor_ips = list(INVALID_IPS)
if local_address:
self._invalid_neighbor_ips.append(str(local_address))
self.netbox = self.interfaces = None
self.identified = False
self.identify()
def _netbox_from_ip(self, ip):
"""Tries to find a Netbox from NAV's database based on an IP address.
:returns: A shadows.Netbox object representing the netbox, or None if no
corresponding netbox was found.
"""
try:
ip = six.text_type(IP(ip))
except ValueError:
self._logger.warning("Invalid IP (%s) in neighbor record: %r",
ip, self.record)
return
assert ip
if ip in self._invalid_neighbor_ips:
return
return (self._netbox_query(Q(ip=ip)) or
self._netbox_query(Q(interface__gwportprefix__gw_ip=ip)))