def parse_prefix_range(expr=None):
pattern = r'^(?P<prefix>((\d+(\.\d+){3})|([0-9a-fA-F:]{2,40}))/\d+)' \
r'(\^(?P<ge>\d+)-(?P<le>\d+))?$'
match = re.match(pattern, expr)
if match:
prefix = ipaddress.ip_network(unicode(match.group("prefix")))
afi = "ipv%d" % prefix.version
entry = {"prefix": prefix}
try:
entry["greater-equal"] = int(match.group("ge"))
entry["less-equal"] = int(match.group("le"))
except (IndexError, TypeError):
pass
else:
raise ValueError("no match found")
return {afi: [entry]}
python类ip_network()的实例源码
def _ipnet(net_str):
"""
This function is used for argparse module to verify validness of
entered network. Valid networks are IPv4 networks in form
A.B.C.D/prefix
:param net_str: string that represents IPv4 net. Example A.B.C.D/prefix.
:return: ipaddress.ip_network object or raises exception on fail
"""
try:
the_net = ipaddress.ip_network(net_str)
except ValueError:
msg = '{} is not a valid IP network address'.format(net_str)
raise argparse.ArgumentTypeError(msg)
if the_net.version != Utilities.IPV4:
msg = 'Only IPv4 addresses are supported'
raise argparse.ArgumentTypeError(msg)
return the_net
def test_arp_ping(self):
"""test ARP ping - compare to arping utilite"""
arp_ping = Ping(IFACE, ARP_NAME, TIMEOUT, False)
for ip in list(ipaddress.ip_network(TEST_NETWORK).hosts())[:5]:
try:
# need arping installed
with os.popen('arping -c {} -t {} {}'.format(COUNT,
TIMEOUT,
str(ip)), 'r'):
# get exit code
ec = os.wait()[1] & 0xFF00
res = arp_ping.ping_host(str(ip))
except PermissionException:
print('Need root previlegies')
if res[STATUS_INDEX] == ONLINE:
self.assertTrue(ec == 0)
else:
self.assertFalse(ec == 0)
def test_icmp_ping(self):
"""test icmp ping - compare to icmping utilite"""
icmp_ping = Ping(IFACE, ICMP_NAME, TIMEOUT, False)
for ip in list(ipaddress.ip_network(TEST_NETWORK).hosts())[:5]:
try:
# need arping installed
with os.popen('ping -c {} -t {} {}'.format(COUNT,
TIMEOUT,
str(ip)), 'r'):
# get exit code
ec = os.wait()[1] & 0xFF00
res = icmp_ping.ping_host(str(ip))
except PermissionException:
print('Need root previlegies')
if res[STATUS_INDEX] == ONLINE:
self.assertTrue(ec == 0)
else:
self.assertFalse(ec == 0)
def edit(request):
if 'object_id' in request.GET:
server = Query({'object_id': request.GET['object_id']}).get()
else:
servertype = Servertype.objects.get(pk=request.POST['attr_servertype'])
project = Project.objects.get(pk=request.POST['attr_project'])
hostname = request.POST['attr_hostname']
if servertype.ip_addr_type == 'null':
intern_ip = None
elif servertype.ip_addr_type == 'network':
intern_ip = ip_network(request.POST['attr_intern_ip'])
else:
intern_ip = ip_interface(request.POST['attr_intern_ip'])
server = ServerObject.new(servertype, project, hostname, intern_ip)
return _edit(request, server, True)
def scan_network(self):
logger.info('scanning Network')
network = self._config_reader.get_config_section("Networking")['network']
netmask = self._config_reader.get_config_section("Networking")['netmask']
my_net = ipaddress.ip_network(network+'/'+netmask)
host_list = dict()
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(2.0)
for ip in my_net:
try:
# print(ip)
host = self._generate_host(ip)
if host is not None:
host_list[ip] = host
except socket.herror as ex:
pass
return host_list
def setUp(self):
super().setUp()
self.cluster.reset_hba()
create_script = []
create_script.append('CREATE ROLE ssl_user WITH LOGIN;')
self.cluster.add_hba_entry(
type='hostssl', address=ipaddress.ip_network('127.0.0.0/24'),
database='postgres', user='ssl_user',
auth_method='trust')
self.cluster.add_hba_entry(
type='hostssl', address=ipaddress.ip_network('::1/128'),
database='postgres', user='ssl_user',
auth_method='trust')
# Put hba changes into effect
self.cluster.reload()
create_script = '\n'.join(create_script)
self.loop.run_until_complete(self.con.execute(create_script))
def __init__(self, value):
"""Initialise new IPPrefix instance."""
try:
obj = ipaddress.ip_address(value)
except Exception:
try:
obj = ipaddress.ip_network(value, strict=False)
except Exception:
raise
if type(obj) in [ipaddress.IPv4Address, ipaddress.IPv6Address]:
self.prefix = None
self.type = self.HOST
elif type(obj) in [ipaddress.IPv4Network, ipaddress.IPv6Network]:
self.prefix = obj.network_address
self.type = self.PREFIX
self.version = obj.version
self.txt = obj.compressed
def test_incompatible_versions(self):
# These should always raise TypeError
v4addr = ipaddress.ip_address('1.1.1.1')
v4net = ipaddress.ip_network('1.1.1.1')
v6addr = ipaddress.ip_address('::1')
v6net = ipaddress.ip_address('::1')
self.assertRaises(TypeError, v4addr.__lt__, v6addr)
self.assertRaises(TypeError, v4addr.__gt__, v6addr)
self.assertRaises(TypeError, v4net.__lt__, v6net)
self.assertRaises(TypeError, v4net.__gt__, v6net)
self.assertRaises(TypeError, v6addr.__lt__, v4addr)
self.assertRaises(TypeError, v6addr.__gt__, v4addr)
self.assertRaises(TypeError, v6net.__lt__, v4net)
self.assertRaises(TypeError, v6net.__gt__, v4net)
def testIpFromInt(self):
self.assertEqual(self.ipv4_interface._ip,
ipaddress.IPv4Interface(16909060)._ip)
ipv4 = ipaddress.ip_network('1.2.3.4')
ipv6 = ipaddress.ip_network('2001:658:22a:cafe:200:0:0:1')
self.assertEqual(ipv4, ipaddress.ip_network(int(ipv4.network_address)))
self.assertEqual(ipv6, ipaddress.ip_network(int(ipv6.network_address)))
v6_int = 42540616829182469433547762482097946625
self.assertEqual(self.ipv6_interface._ip,
ipaddress.IPv6Interface(v6_int)._ip)
self.assertEqual(ipaddress.ip_network(self.ipv4_address._ip).version,
4)
self.assertEqual(ipaddress.ip_network(self.ipv6_address._ip).version,
6)
def testHash(self):
self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')),
hash(ipaddress.ip_interface('10.1.1.0/24')))
self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')),
hash(ipaddress.ip_network('10.1.1.0/24')))
self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')),
hash(ipaddress.ip_address('10.1.1.0')))
# i70
self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')),
hash(ipaddress.ip_address(
int(ipaddress.ip_address('1.2.3.4')._ip))))
ip1 = ipaddress.ip_address('10.1.1.0')
ip2 = ipaddress.ip_address('1::')
dummy = {}
dummy[self.ipv4_address] = None
dummy[self.ipv6_address] = None
dummy[ip1] = None
dummy[ip2] = None
self.assertIn(self.ipv4_address, dummy)
self.assertIn(ip2, dummy)
def _ip_ranges (self, h):
if '/' in h:
if h.startswith('gateway/web/freenode/ip.'):
h = h.replace('gateway/web/freenode/ip.','')
return [ipaddress.ip_network(u'%s/27' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/26' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/25' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/24' % h, strict=False).with_prefixlen.encode('utf-8')]
return [h]
if utils.net.isIPV4(h):
return [ipaddress.ip_network(u'%s/27' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/26' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/25' % h, strict=False).with_prefixlen.encode('utf-8'),ipaddress.ip_network(u'%s/24' % h, strict=False).with_prefixlen.encode('utf-8')]
if utils.net.bruteIsIPV6(h):
r = []
r.append(ipaddress.ip_network(u'%s/120' % h, False).with_prefixlen.encode('utf-8'))
r.append(ipaddress.ip_network(u'%s/118' % h, False).with_prefixlen.encode('utf-8'))
r.append(ipaddress.ip_network(u'%s/116' % h, False).with_prefixlen.encode('utf-8'))
r.append(ipaddress.ip_network(u'%s/114' % h, False).with_prefixlen.encode('utf-8'))
r.append(ipaddress.ip_network(u'%s/112' % h, False).with_prefixlen.encode('utf-8'))
r.append(ipaddress.ip_network(u'%s/110' % h, False).with_prefixlen.encode('utf-8'))
r.append(ipaddress.ip_network(u'%s/64' % h, False).with_prefixlen.encode('utf-8'))
return r
return [h]
def is_authorized(ip, authorized_ips):
ip = ip_address(ip)
for item in authorized_ips:
try:
if ip == ip_address(item):
return True
except ValueError:
try:
if ip in ip_network(item):
return True
except ValueError:
logger.warn('The "authorized_ip" list (settings.HEARTBEAT)'
'contains an item that is neither an ip address '
'nor an ip network: {}'.format(item))
def check_access(clientip):
global BANNED_IPS
if clientip in BANNED_IPS:
return False
if not ALLOWED_NETWORKS:
return True
for net in ALLOWED_NETWORKS:
ipobject = ipaddress.ip_address(clientip)
if ipobject in ipaddress.ip_network(net):
return True
BANNED_IPS.append(clientip)
return False
def public_interface(self):
"""
Check that all minions have an address on the public network
"""
for node in self.data.keys():
if ('roles' in self.data[node] and
'master' in self.data[node]['roles']):
continue
found = False
public_network = self.data[node].get("public_network", "")
net_list = Util.parse_list_from_string(public_network)
for address in self.grains[node]['ipv4']:
try:
for network in net_list:
addr = ipaddress.ip_address(u'{}'.format(address))
net = ipaddress.ip_network(u'{}'.format(network))
if addr in net:
found = True
except ValueError:
# Don't care about reporting a ValueError here if
# public_network is malformed, because the
# previous validation in public_network() will do that.
pass
if not found:
msg = "minion {} missing address on public network {}".format(node, public_network)
self.errors.setdefault('public_interface', []).append(msg)
self._set_pass_status('public_network')
def cluster_interface(self):
"""
Check that storage nodes have an interface on the cluster network
"""
# pylint: disable=too-many-nested-blocks
for node in self.data.keys():
if ('roles' in self.data[node] and
'storage' in self.data[node]['roles']):
found = False
cluster_network = self.data[node].get("cluster_network", "")
net_list = Util.parse_list_from_string(cluster_network)
for address in self.grains[node]['ipv4']:
try:
for network in net_list:
addr = ipaddress.ip_address(u'{}'.format(address))
net = ipaddress.ip_network(u'{}'.format(network))
if addr in net:
found = True
except ValueError:
# Don't care about reporting a ValueError here if
# cluster_network is malformed, because the
# previous validation in cluster_network() will do that.
pass
if not found:
msg = "minion {}".format(node)
msg += " missing address on cluster network {}".format(cluster_network)
self.errors.setdefault('cluster_interface', []).append(msg)
self._set_pass_status('cluster_interface')
def address():
"""
Find the public address for a minion
"""
if 'public_network' not in __pillar__:
return ""
log.debug("pillar: {}".format(type(__pillar__['public_network'])))
if isinstance(__pillar__['public_network'], str):
networks = re.split(', *', __pillar__['public_network'])
else:
networks = __pillar__['public_network']
log.debug("networks: {}".format(pprint.pformat(networks)))
for public_network in networks:
log.info('public_network: {}'.format(public_network))
interfaces = __salt__['network.interfaces']()
log.debug("interfaces: {}".format(pprint.pformat(interfaces)))
for interface in interfaces:
log.info("interface: {}".format(pprint.pformat(interface)))
if 'inet' in interfaces[interface]:
for entry in interfaces[interface]['inet']:
_address = entry['address']
log.info("Checking address {}".format(_address))
if (ipaddress.ip_address(u'{}'.format(_address)) in
ipaddress.ip_network(u'{}'.format(public_network))):
return _address
return ""
def validate_ip_network(network, strict=True):
'''
If network is a valid IP network when parsed according to strict,
return it as an ipnetwork object.
Otherwise, return None.
'''
try:
return ipaddress.ip_network(unicode(address), strict=strict)
except ValueError:
return None
def get_networks():
network_tokens = os.environ.get('TALISKER_NETWORKS', '').split()
networks = [ip_network(force_unicode(n)) for n in network_tokens]
if networks:
logger = logging.getLogger(__name__)
logger.info('configured TALISKER_NETWORKS',
extra={'networks': [str(n) for n in networks]})
return networks
def cast_network(s, cur=None):
if s is None:
return None
return ipaddress.ip_network(str(s))