def _load_existing_mappings(self):
"""Load the existing ARP records for this box from the db.
Returns:
A deferred whose result is a dictionary: { (ip, mac): arpid }
"""
self._logger.debug("Loading open arp records from database")
open_arp_records_queryset = manage.Arp.objects.filter(
netbox__id=self.netbox.id,
end_time__gte=datetime.max).values('id', 'ip', 'mac')
open_arp_records = yield db.run_in_thread(
storage.shadowify_queryset_and_commit,
open_arp_records_queryset)
self._logger.debug("Loaded %d open records from arp",
len(open_arp_records))
open_mappings = dict(((IP(arp['ip']), arp['mac']), arp['id'])
for arp in open_arp_records)
defer.returnValue(open_mappings)
python类IP的实例源码
def get_ipv4_multicast_groups_per_port(self):
"""
Returns IGMP snooping information from ports.
:returns: A Deferred whose result is a list of MulticastStat tuples
"""
column = "hpIgmpStatsPortAccess2"
ports = yield self.retrieve_columns(
[column]
).addCallback(self.translate_result).addCallback(reduce_index)
def _split(item):
index, columns = item
vlan = index[0]
group = index[1:5]
ifindex = index[5]
access = columns[column]
return MulticastStat(IP('.'.join(str(i) for i in group)), ifindex,
vlan, access)
defer.returnValue([_split(i) for i in iteritems(ports)])
def oid_to_ipv4(oid):
"""Converts a sequence of 4 numbers to an IPv4 object in the fastest
known way.
:param oid: Any list or tuple of 4 integers.
"""
if len(oid) != 4:
raise ValueError("IPv4 address must be 4 octets, not %d" % len(oid))
try:
addr, = unpack("!I", array.array("B", oid).tostring())
except OverflowError as error:
raise ValueError(error)
return IP(addr, ipversion=4)
#############################################################################
# Varios OID consumer functions, which can be fed to the consume() function #
#############################################################################
# pylint: disable=invalid-name
def __init__(self, host, community="public", version="1", port=161,
retries=3, timeout=1):
"""Makes a new Snmp-object.
:param host: hostname or IP address
:param community: community (password), defaults to "public"
:param port: udp port number, defaults to "161"
"""
self.host = host
self.community = str(community)
self.version = str(version)
if self.version == '2':
self.version = '2c'
if self.version not in ('1', '2c'):
raise UnsupportedSnmpVersionError(self.version)
self.port = int(port)
self.retries = retries
self.timeout = timeout
self.handle = _MySnmpSession(self._build_cmdline())
self.handle.open()
def _build_cmdline(self):
try:
address = IP(self.host)
except ValueError:
host = self.host
else:
host = ('udp6:[%s]' % self.host if address.version() == 6
else self.host)
return (
'-v' + self.version,
'-c', self.community,
'-r', str(self.retries),
'-t', str(self.timeout),
'%s:%s' % (host, self.port)
)
def is_valid_cidr(cidr):
"""Verifies that a string is valid IPv4 or IPv6 CIDR specification.
A cleaned up version of the CIDR string is returned if it is verified,
otherwise a false value is returned.
Uses the IPy library to verify addresses.
"""
if (isinstance(cidr, six.string_types) and
not cidr.isdigit() and '/' in cidr):
try:
valid_cidr = IPy.IP(cidr) is not None
except (ValueError, TypeError):
return False
else:
return valid_cidr
return False
def is_ipv4(ip):
"""Check if the [ip] is a real ip addr.
Params:
ip: :str: General IP format.
Returns:
:type: bool
:desc: if ip is a valid IP addr, return true
else return False"""
try:
IP(ip)
return True
except ValueError:
return False
#----------------------------------------------------------------------
def VerifyIp(ip):
ip_type = None
try:
ip_type = IP(ip).iptype()
if ip_type is not "PUBLIC":
ip_type = 'Private'
else:
ip_type = 'Public'
except Exception:
pass
finally:
return ip_type
def __str__(self):
all_subnets = True
for ip in self.dst_ip:
all_subnets = all_subnets and ip.prefixlen() not in [32, 128]
more_than_one = len(self.dst_ip) > 1
if all_subnets:
tpl = "Destination IP must fall into {}"
else:
if more_than_one:
tpl = "Destination IP must be in {}"
else:
tpl = "Destination IP must be {}"
return tpl.format(self._str_list())
def __init__(self, cfg):
ExpResCriterion_DNSRecord.__init__(self, cfg)
self.address = []
addresses = self._enforce_list("address", str)
for address in addresses:
try:
ip = IPy.IP(address)
except:
raise ConfigError(
"Invalid IP for {} record: {}".format(
self.RECORD_TYPE, address
)
)
if ip.version() != self.IP_VER:
raise ConfigError(
"Invalid IP version ({}) for record type {}.".format(
ip.version(), self.RECORD_TYPE
)
)
self.address.append(ip)
validate_ip_address.py 文件源码
项目:LinuxBashShellScriptForOps
作者: DingGuodong
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def is_valid_ipv4(ip, version=4):
import os
import sys
try:
from IPy import IP
except ImportError:
try:
command_to_execute = "pip install IPy || easy_install IPy"
os.system(command_to_execute)
except OSError:
print "Can NOT install 'IPy', Aborted!"
sys.exit(1)
except Exception as e:
print "Uncaught exception, %s" % e.message
sys.exit(1)
from IPy import IP
try:
result = IP(ip, ipversion=version)
except ValueError:
return False
if result is not None and result != "":
return True
def network_size(net, dhcp=None):
"""
Func return gateway, mask and dhcp pool.
"""
mask = IP(net).strNetmask()
addr = IP(net)
gateway = addr[1].strNormal()
dhcp_pool = [addr[2].strNormal(), addr[addr.len() - 2].strNormal()]
if dhcp:
return gateway, mask, dhcp_pool
else:
return gateway, mask, None
# Read write lock
# ---------------
def get_ipv4_network(self):
xml = self._XMLDesc(0)
if util.get_xml_path(xml, "/network/ip") is None:
return None
addrStr = util.get_xml_path(xml, "/network/ip/@address")
netmaskStr = util.get_xml_path(xml, "/network/ip/@netmask")
prefix = util.get_xml_path(xml, "/network/ip/@prefix")
if prefix:
prefix = int(prefix)
binstr = ((prefix * "1") + ((32 - prefix) * "0"))
netmaskStr = str(IP(int(binstr, base=2)))
if netmaskStr:
netmask = IP(netmaskStr)
gateway = IP(addrStr)
network = IP(gateway.int() & netmask.int())
ret = IP(str(network) + "/" + netmaskStr)
else:
ret = IP(str(addrStr))
return ret
def update_hosts(self, hosts_info): # pylint: disable=no-self-use
""" Update hosts info """
if not isinstance(hosts_info, dict):
return api_utils.result_handler(
status=1, data='Error, args should be a dict')
for key, value in hosts_info.items():
if key:
try:
IPy.IP(value)
except Exception: # pylint: disable=broad-except
return api_utils.result_handler(
status=1, data='The IP %s is invalid' % value)
else:
return api_utils.result_handler(
status=1, data='Domain name is absent')
try:
functest_flag = "# SUT hosts info for Functest"
hosts_list = ('\n{} {} {}'.format(ip, host_name, functest_flag)
for host_name, ip in hosts_info.items())
with open("/etc/hosts", 'r') as file_hosts:
origin_lines = [line for line in file_hosts
if functest_flag not in line]
with open("/etc/hosts", 'w') as file_hosts:
file_hosts.writelines(origin_lines)
file_hosts.write(functest_flag)
file_hosts.writelines(hosts_list)
except Exception: # pylint: disable=broad-except
return api_utils.result_handler(
status=1, data='Error when updating hosts info')
else:
return api_utils.result_handler(
status=0, data='Update hosts info successfully')
def data_received(self, data):
data = self.get_data(data)
if data is None:
self.transport.close()
return
self.request = DNSRecord.parse(data)
if self.args.debug:
print('Data received: {!r}'.format(self.request))
from IPy import IP
ip = IP(self.peername[0])
client_ip = self.peername[0]
if ip.iptype() == 'PRIVATE':
client_ip = self.args.myip
url = 'https://dns.google.com/resolve?name={}&edns_client_subnet={}/24'.format(str(self.request.q.qname),
client_ip)
# client = ProxyClient()
# google_dns_resp = client.query_domain(url, self.args.proxy)
try:
from asyncio import ensure_future
except ImportError:
from asyncio import async as ensure_future
asyncio.ensure_future(ProxyClient.query_domain(url, self.args.proxy), loop=self.loop).add_done_callback(
functools.partial(self.send_resp))
def get_arg():
"""????"""
parser = argparse.ArgumentParser(prog='prcdns', description='google dns proxy.')
parser.add_argument('--debug', help='debug model,default NO', default=False)
parser.add_argument('-l', '--listen', help='listening IP,default 0.0.0.0', default='0.0.0.0')
parser.add_argument('-p', '--port', help='listening Port,default 3535', default=3535)
parser.add_argument('-r', '--proxy', help='Used For Query Google DNS,default direct', default=None)
parser.add_argument('-ip', '--myip', help='IP location', default=None)
return parser.parse_args()
def IP(ip):
return ip
def formatStructValue(self, struct, name, value):
if struct.startswith("sockaddr") and name.endswith("family"):
return SOCKET_FAMILY.get(value, value)
if struct == "sockaddr_in":
if name == "sin_port":
return ntoh_ushort(value)
if name == "sin_addr":
ip = ntoh_uint(value.s_addr)
return IP(ip)
return None
def value_type(value):
"""Value type simply identifies if the passed value
is a domain or IP address.
@param string text to test as an IP
@returns domain or IP
"""
try:
IP(value)
return 'ip'
except:
return 'domain'
def gen_info():
return ("IP: %s \n Organization: %s \n Operating System: %s" % (host['ip_str'], host.get('org', 'n/a'), host.get('os', 'n/a')))