def isIPAddress(ip, compressed=True):
"""Check if an arbitrary string is an IP address, and that it's valid.
:type ip: basestring or int
:param ip: The IP address to check.
:param boolean compressed: If True, return a string representing the
compressed form of the address. Otherwise, return an
:class:`ipaddr.IPAddress` instance.
:rtype: A :class:`ipaddr.IPAddress`, or a string, or False
:returns: The IP, as a string or a class, if it passed the
checks. Otherwise, returns False.
"""
try:
ip = ipaddr.IPAddress(ip)
except ValueError:
return False
else:
if isValidIP(ip):
if compressed:
return ip.compressed
else:
return ip
return False
python类IPAddress()的实例源码
def isIPv(version, ip):
"""Check if **ip** is a certain **version** (IPv4 or IPv6).
.. warning: Do *not* put any calls to the logging module in this function,
or else an infinite recursion will occur when the call is made, due
the the log :class:`~logging.Filter`s in :mod:`~bridgedb.safelog`
using this function to validate matches from the regular expression
for IP addresses.
:param integer version: The IPv[4|6] version to check; must be either
``4`` or ``6``. Any other value will be silently changed to ``4``.
:param ip: The IP address to check. May be an any type which
:class:`ipaddr.IPAddress` will accept.
:rtype: boolean
:returns: ``True``, if the address is an IPv4 address.
"""
try:
ipaddr.IPAddress(ip, version=version)
except (ipaddr.AddressValueError, Exception):
return False
else:
return True
return False
def parse_device_desc_xml(filename):
root = ET.parse(filename).getroot()
(lo_prefix, mgmt_prefix, hostname, hwsku, d_type) = parse_device(root)
results = {}
results['DEVICE_METADATA'] = {'localhost': {
'hostname': hostname,
'hwsku': hwsku,
}}
results['LOOPBACK_INTERFACE'] = {('lo', lo_prefix): {}}
mgmt_intf = {}
mgmtipn = ipaddress.IPNetwork(mgmt_prefix)
gwaddr = ipaddress.IPAddress(int(mgmtipn.network) + 1)
results['MGMT_INTERFACE'] = {('eth0', mgmt_prefix): {'gwaddr': gwaddr}}
return results
def ip_is_valid(self, ip):
"""
Returns true if the given string is a valid IP address (v4 or v6).
"""
try:
ipaddr.IPAddress(ip)
return True
except ValueError:
return False
#}}}
#{{{ Check if network is valid
def get_network(self, net_type):
"""
Returns a network (as a string) that includes the private/public IPs of all nodes in the config.
Raises an EXAConfError if an invalid IP is found or the IP of at least one node is not part
of the network defined by the first node section.
This function assumes that all nodes have an entry for the requested network type. The calling
function has to check if the network type is actually present (private / public).
"""
network = ""
for section in self.config.sections:
if self.is_node(section):
node_sec = self.config[section]
node_network = node_sec.get(net_type)
if not node_network or node_network == "":
raise EXAConfError("Network type '%s' is missing in section '%s'!" % (net_type, section))
node_ip = node_network.split("/")[0].strip()
# check if the extracted IP is valid
if not self.ip_is_valid(node_ip):
raise EXAConfError("IP %s in section '%s' is invalid!" % (node_ip, section))
# first node : choose the private net as the cluster network (and make it a 'real' network)
if network == "":
subnet = ipaddr.IPNetwork(node_network)
network = "%s/%s" % (str(subnet.network), str(subnet.prefixlen))
# other nodes : check if their IP is part of the chosen net
elif ipaddr.IPAddress(node_ip) not in ipaddr.IPNetwork(network):
raise EXAConfError("IP %s is not part of network %s!" % (node_ip, network))
return network
#}}}
#{{{ Get private network
def get_ip_repr(ip_ver, net_int):
return ipaddr.IPAddress(net_int if ip_ver == 4 else net_int << 64)
def analy_req(address):
mainDomain = conf_read('maindomain')
address = address[:-len(mainDomain) - 1]
payload = conf_read('payload')
encoding = conf_read('encoding')
record = address
try:
if encoding == 'int':
record = ipaddr.IPAddress(int(address)).__str__()
elif encoding == 'hex':
try:
address = address.decode('hex')
if ipaddr.IPAddress(address).version == 4:
record = address
elif conf_read('type') == 'AAAA' and ipaddr.IPAddress(address).version == 6:
record = address
else:
pass
except:
pass
# elif False not in map(lambda x:x in map(lambda x:chr(x),range(97,108)),list(address)):
elif encoding == 'en':
record = numToEnToNum(address)
elif payload != 'None' and payload.find(mainDomain) == -1:
# record = payload + "www.google.com"
record = payload + mainDomain
except Exception,e:
print '[!] Subdomain Invalid {}'.format(e)
finally:
return record
def ipListBuild(address):
print '1. Single IP Covert For En\n2. Build IP List'
opt_req = raw_input("[+] [1 By Default/2]") or '1'
if opt_req == '1':
print numToEnToNum(address)
exit()
conf_main = conf_read('maindomain')[:-1]
seg_len = raw_input("[+] Please Input Segment Length [24 By Default]") or 24
encode_req = raw_input("[+] Please Input Encoding ['ipv4' By Default]")
mainDomain = raw_input("[+] Please Input Server Root Address [{} By Default]".format(conf_main)) or conf_main
segment = eval("ipaddr.IPv4Network('{}/{}').iterhosts()".format(address, int(seg_len)))
save_file = "{}_{}_{}.txt".format(time.strftime("%Y%m%d%X", time.localtime()).replace(':', ''), mainDomain.replace('.','_'),(encode_req if encode_req else 'ipv4'))
results = []
try:
if encode_req == '': results += ["{}.{}".format(str(i),mainDomain) for i in list(segment)]
elif encode_req == 'en':
results += ["{}.{}".format(numToEnToNum(str(i)),mainDomain) for i in list(segment)]
elif encode_req == 'int':
results += ["{}.{}".format(int(ipaddr.IPAddress(str(i))),mainDomain) for i in list(segment)]
elif encode_req == 'hex':
results += ["{}.{}".format(str(i).encode('hex'),mainDomain) for i in list(segment)]
else:
pass
f = open(save_file,'a')
[f.write(i+'\n') for i in results]
f.close()
print '[+] Stored in the {}'.format(save_file)
except Exception,e:
print e
exit()
def __init__(self, max_length=50, *args, **kwargs):
if not ip_address:
raise ImproperlyConfigured(
"'ipaddr' package is required to use 'IPAddressType' "
"in python 2"
)
super(IPAddressType, self).__init__(*args, **kwargs)
self.impl = types.Unicode(max_length)
def process_result_value(self, value, dialect):
return ip_address(value) if value else None
def _coerce(self, value):
return ip_address(value) if value else None
def _ip_subnet_valid(self, ip, subnet):
return ipaddr.IPAddress(ip) in ipaddr.IPNetwork(subnet.address)
def select_subnet_for_ip(ip, subnets):
for subnet in subnets:
if ipaddr.IPAddress(ip) in ipaddr.IPv4Network(subnet.address):
return subnet
def __init__(self, server_address, handler, *args, **kwargs):
self.session = kwargs.pop("session")
(address, _) = server_address
version = ipaddr.IPAddress(address).version
if version == 4:
self.address_family = socket.AF_INET
elif version == 6:
self.address_family = socket.AF_INET6
BaseHTTPServer.HTTPServer.__init__(
self, server_address, handler, *args, **kwargs)
def buildSocket(self, addr):
global s
ip = ipaddr.IPAddress(addr) ## learn if we're IPv4 or IPv6
if ip.version == 4:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
elif ip.version == 6:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
return s
def contains(self, ip_address):
ip = ipaddr.IPAddress(ip_address)
if isinstance(ip, ipaddr.IPv4Address):
networks = self.ipv4_networks
elif isinstance(ip, ipaddr.IPv6Address):
networks = self.ipv6_networks
else:
raise RuntimeError("Should never happen")
for network in networks:
if network.Contains(ip):
return True
return False
def getAddresses():
from scapy.all import get_if_addr, get_if_list
from ipaddr import IPAddress
addresses = set()
for i in get_if_list():
try:
addresses.add(get_if_addr(i))
except:
pass
if '0.0.0.0' in addresses:
addresses.remove('0.0.0.0')
return [IPAddress(addr) for addr in addresses]
def __init__(self, ip):
if ip_library == "ipaddr":
self.obj = ipaddr.IPAddress(ip)
self.ip = str(self.obj)
else:
self.obj = ipaddress.ip_address(ip)
self.ip = str(self.obj.compressed)
self.version = self.obj.version
def __init__(self, vid, conf=None):
if conf is None:
conf = {}
self.vid = vid
self.tagged = []
self.untagged = []
self.name = conf.setdefault('name', str(vid))
self.description = conf.setdefault('description', self.name)
self.controller_ips = conf.setdefault('controller_ips', [])
if self.controller_ips:
self.controller_ips = [
ipaddr.IPNetwork(ip) for ip in self.controller_ips]
self.unicast_flood = conf.setdefault('unicast_flood', True)
self.routes = conf.setdefault('routes', {})
self.ipv4_routes = {}
self.ipv6_routes = {}
if self.routes:
self.routes = [route['route'] for route in self.routes]
for route in self.routes:
ip_gw = ipaddr.IPAddress(route['ip_gw'])
ip_dst = ipaddr.IPNetwork(route['ip_dst'])
assert(ip_gw.version == ip_dst.version)
if ip_gw.version == 4:
self.ipv4_routes[ip_dst] = ip_gw
else:
self.ipv6_routes[ip_dst] = ip_gw
self.arp_cache = {}
self.nd_cache = {}
self.max_hosts = conf.setdefault('max_hosts', None)
self.host_cache = {}
def evaluate(self,
hints # Information used for doing identification
):
"""Given a set of hints, evaluate this identifier and return True if
an identification is made.
"""
try:
ip = hints['requester']
except KeyError:
return False
octets = dns.reversename.from_address(ip)[0:-3]
host = '.'.join(octets)
host += '.v4.fullbogons.cymru.com' if len(octets) == 4 \
else '.v6.fullbogons.cymru.com'
# The query for this is always for an 'A' record, even for
# IPv6 hosts.
try:
resolved = self.resolver.query(host, 'A')[0]
except dns.resolver.NXDOMAIN:
return False
except (dns.exception.Timeout,
dns.resolver.NoAnswer,
dns.resolver.NoNameservers):
return self.fail_result
# The query will return 127.0.0.2 if it's in the bogon list.
# See http://www.team-cymru.org/bogon-reference-dns.html.
if str(resolved) != '127.0.0.2':
return False
# At this point, we have a bogon. Filter out exclusions.
net_ip = ipaddr.IPAddress(ip)
for exclude in self.exclude:
if net_ip in exclude:
return False
# Not excluded; must be a legit bogon.
return True
# A short test program
def evaluate(self,
hints # Information used for doing identification
):
"""Given a set of hints, evaluate this identifier and return True if
an identification is made.
"""
try:
ip = hints['requester']
except KeyError:
return False
addr = ipaddr.IPAddress(ip)
ip_reverse = dns.reversename.from_address(ip)
# Resolve to a FQDN
try:
reverse = str(self.resolver.query(ip_reverse, 'PTR')[0])
except (dns.resolver.NXDOMAIN,
dns.exception.Timeout,
dns.resolver.NoAnswer,
dns.resolver.NoNameservers):
return False
# Resolve the FQDN back to an IP and see if they match. This
# prevents someone in control over their reverse resolution
# from claiming they're someone they're not.
# TODO: Check against _all_ returned IPs
record = 'A' if addr.version == 4 else 'AAAA'
try:
forwards = self.resolver.query(reverse, record)
except (dns.resolver.NXDOMAIN,
dns.exception.Timeout,
dns.resolver.NoAnswer,
dns.resolver.NoNameservers):
return False
if ip not in [ str(f) for f in forwards ]:
return False
# Try to match with and without the dot at the end.
for reverse_candidate in [ reverse, reverse.rstrip('.') ]:
if self.matcher.matches(reverse_candidate):
return True
# No match, no dice.
return False
# A short test program
def _pass_hostinfo(self, entry):
s = None
if entry['host'] not in self._state:
s = {
'vulnerabilities': [],
'exempt_vulnerabilities': [],
'ports': set(),
'hostname': None,
'ipaddress': None,
'os': None,
'credentialed_checks': False
}
else:
s = self._state[entry['host']]
# if the hostname has not been set yet, just default it to the key/target
# value
if s['hostname'] == None:
s['hostname'] = entry['host']
thishostinfo = self._hostinfo_locator(entry)
# attempt to determine the ip address; if our target is an ip just use that,
# otherwise try to locate the ip address using the supplementary host info
try:
ipaddr.IPAddress(entry['host'])
s['ipaddress'] = entry['host']
except:
if thishostinfo != None:
s['ipaddress'] = thishostinfo['host-ip']
if thishostinfo != None and 'operating-system' in thishostinfo:
s['os'] = thishostinfo['operating-system']
# attempt to extract kernel hostname
if 'output of \"uname -a\" is' in entry['output']:
unamestr = entry['output'].replace('\n', ' ')
m = re.search('output of "uname -a" is : Linux (\S+) ', unamestr)
if m != None:
s['hostname'] = m.group(1)
elif '= Computer name' in entry['output']:
cnamestr = entry['output'].replace('\n', ' ')
m = re.search('(\S+)\s+= Computer name', cnamestr)
if m != None:
s['hostname'] = m.group(1)
# flip credentialed checks if we find plugin output indicating the scan
# included successfully used credentials
if 'Credentialed checks : yes' in entry['output']:
s['credentialed_checks'] = True
self._state[entry['host']] = s
def load_hosts(self):
for host_name in self.storage.hosts[2]:
stor_node = self.storage.get("hosts/" + host_name, expected_format=None)
host = Host(host_name)
self.hosts[host.name] = host
lshw_xml = stor_node.get('lshw', expected_format='xml')
if lshw_xml is None:
host.hw_info = None
else:
try:
host.hw_info = get_hw_info(lshw_xml)
except:
host.hw_info = None
info = self.parse_meminfo(stor_node.get('meminfo'))
host.mem_total = info['MemTotal']
host.mem_free = info['MemFree']
host.swap_total = info['SwapTotal']
host.swap_free = info['SwapFree']
loadavg = stor_node.get('loadavg')
host.load_5m = None if loadavg is None else float(loadavg.strip().split()[1])
ipa = self.storage.get('hosts/%s/ipa' % host.name)
ip_rr_s = r"\d+:\s+(?P<adapter>.*?)\s+inet\s+(?P<ip>\d+\.\d+\.\d+\.\d+)/(?P<size>\d+)"
info = collections.defaultdict(lambda: [])
for line in ipa.split("\n"):
match = re.match(ip_rr_s, line)
if match is not None:
info[match.group('adapter')].append(
(IPAddress(match.group('ip')), int(match.group('size'))))
for adapter, ips_with_sizes in info.items():
for ip, sz in ips_with_sizes:
if self.public_net is not None and ip in self.public_net:
host.public_net = NetworkAdapter(adapter, ip)
if self.cluster_net is not None and ip in self.cluster_net:
host.cluster_net = NetworkAdapter(adapter, ip)
interfaces = getattr(self.jstorage.hosts, host_name).interfaces
for name, adapter_dct in interfaces.items():
adapter_dct = adapter_dct.copy()
dev = adapter_dct.pop('dev')
adapter = NetworkAdapter(dev, None)
adapter.__dict__.update(adapter_dct)
host.net_adapters[dev] = adapter
net_stats = self.get_node_net_stats(host.name)
perf_adapters = [host.cluster_net, host.public_net] + list(host.net_adapters.values())
for net in perf_adapters:
if net is not None and net.name is not None:
net.perf_stats = net_stats.get(net.name)
host.uptime = float(stor_node.get('uptime').split()[0])