def public_network(self):
"""
All nodes must have the same public network. The public network
must be valid.
"""
for node in self.data.keys():
public_network = self.data[node].get("public_network", "")
net_list = Util.parse_list_from_string(public_network)
log.debug("public_network: {} {}".format(node, net_list))
for network in net_list:
try:
ipaddress.ip_network(u'{}'.format(network))
except ValueError as err:
msg = "{} on {} is not valid: {}".format(network, node, err)
self.errors.setdefault('public_network', []).append(msg)
self._set_pass_status('public_network')
python类ip_network()的实例源码
def cluster_network(self):
"""
All storage nodes must have the same cluster network. The cluster
network must be valid.
"""
for node in self.data.keys():
if ('roles' in self.data[node] and
'storage' in self.data[node]['roles']):
cluster_network = self.data[node].get("cluster_network", "")
net_list = Util.parse_list_from_string(cluster_network)
log.debug("cluster_network: {} {}".format(node, net_list))
for network in net_list:
try:
ipaddress.ip_network(u'{}'.format(network))
except ValueError as err:
msg = "{} on {} is not valid: {}".format(network, node, err)
self.errors.setdefault('cluster_network', []).append(msg)
self._set_pass_status('cluster_network')
def block_cidr_network(*blocked_networks):
# type: (*str) -> Callable
"""Block recipient URLs from a list of CIDR networks.
Example:
>>> block_cidr_network('192.168.0.0/24', '132.34.23.0/24')
"""
_blocked_networks = [ip_network(text_type(x)) for x in blocked_networks]
def validate_cidr(recipient_url):
# type: (str) -> None
recipient_addr = _url_ip_address(recipient_url)
for blocked_network in _blocked_networks:
if recipient_addr in blocked_network:
raise SecurityError(
'IP address of recipient {0}={1} is in network {2}'.format(
recipient_url, recipient_addr, blocked_network,
))
validate_cidr._args = blocked_networks
validate_cidr._validator = 'block_cidr_network'
return validate_cidr
def test_cidr_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::cidr")
self.assertTrue(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.0/24'::cidr")
obj = cur.fetchone()[0]
self.assertTrue(isinstance(obj, ip.IPv4Network), repr(obj))
self.assertEqual(obj, ip.ip_network('127.0.0.0/24'))
cur.execute("select '::ffff:102:300/128'::cidr")
obj = cur.fetchone()[0]
self.assertTrue(isinstance(obj, ip.IPv6Network), repr(obj))
self.assertEqual(obj, ip.ip_network('::ffff:102:300/128'))
def set_unset(self, interface_id, attribute, address=None):
"""
Set attribute to True and unset the same attribute for all other
interfaces. This is used for interface options that can only be
set on one engine interface.
"""
for interface in self:
for sub_interface in interface.sub_interfaces():
# Skip VLAN only interfaces (no addresses)
if not isinstance(sub_interface, PhysicalVlanInterface):
if getattr(sub_interface, attribute) is not None:
if sub_interface.nicid == str(interface_id):
if address is not None: # Find IP on Node Interface
if ipaddress.ip_address(bytes_to_unicode(address)) in \
ipaddress.ip_network(sub_interface.network_value):
setattr(sub_interface, attribute, True)
else:
setattr(sub_interface, attribute, False)
else:
setattr(sub_interface, attribute, True)
else: #unset
setattr(sub_interface, attribute, False)
discovery.py 文件源码
项目:SupercomputerInABriefcase
作者: SupercomputerInABriefcase
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def process_arguments(args: List[str]) -> ConnectionData:
assert isinstance(args, list)
for a in args:
assert isinstance(a, str)
if len(args) == 0:
#rv = ConnectionData(ip_net='192.168.3.0/24', iface='eth0')
rv = ConnectionData(ip_net='192.168.3.0/24', iface='wlan0')
elif len(args) == 1:
rv = ConnectionData(ip_net=args[0], iface='eth0')
elif len(args) == 2:
rv = ConnectionData(ip_net=args[0], iface=args[1])
else:
print('Incorrect number of arguments.')
sys.exit(1)
try:
ipaddress.ip_network(rv.ip_net)
except ValueError:
print("First parameter doesn't appear to be an IP network specification.")
sys.exit(1)
return rv
def _validate_cidr_format(cidr):
"""Validate CIDR IP range
:param str cidr:
:return:
:rtype: bool
"""
try:
ipaddress.ip_network(cidr, strict=False)
except (ValueError, ipaddress.AddressValueError,
ipaddress.NetmaskValueError):
return False
if '/' not in cidr:
return False
if re.search('\s', cidr):
return False
return True
def add_ban(self, ip, reason, duration, name=None):
"""
Ban an ip with an optional reason and duration in minutes. If duration
is None, ban is permanent.
"""
network = ip_network(text_type(ip), strict=False)
for connection in list(self.connections.values()):
if ip_address(connection.address[0]) in network:
name = connection.name
connection.kick(silent=True)
if duration:
duration = reactor.seconds() + duration * 60
else:
duration = None
self.bans[ip] = (name or '(unknown)', reason, duration)
self.save_bans()
def load_config(self, config):
with open(config) as c:
cfg = json.load(c)
m = hashlib.sha224()
m.update(bytes(cfg['subnet'], 'utf-8'))
m.update(bytes(cfg['swap_size']))
m.update(bytes(cfg['root_size']))
m.update(bytes(cfg['extra_args'], 'utf-8'))
# TODO: check sizes
for f in cfg['hashes']:
with open(f, 'rb') as fl:
for line in fl:
m.update(line)
cfg['sha224'] = m.hexdigest()
cfg['subnet'] = ipaddress.ip_network(cfg['subnet'])
cfg['image_method'] = IMAGE_METHOD
return cfg
def getTestNetworkIps(manifest):
testSubnets = getTestSubnets(manifest)
testSubnet = testSubnets[0]
otherSubnets = testSubnets[1:]
testIpSubnet = ipaddress.ip_network(testSubnet["range"])
otherIpSubnets = [ipaddress.ip_network(s["range"]) for s in otherSubnets]
subnetGateway = testSubnet["gateway"]
overlapOtherIpSubnets = [s for s in otherIpSubnets if testIpSubnet.overlaps(s)]
for otherIpSubnet in overlapOtherIpSubnets:
testIpSubnet = testIpSubnet.address_exclude(otherIpSubnet)
testHosts = [h.exploded for h in testIpSubnet.hosts()]
testHosts.remove(subnetGateway)
return testHosts
def _ip_network_hosts(obj, page=None):
"""
Returns a portion of IP addresses based on page parameter.
:returns: IP addresses
:rtype: iterator (AC-3531)
Previous rtype was a generator
"""
pages = obj.pages()
page = _page(page, pages)
net_ip = obj.network_address + (page - 1) * 2 ** obj.page_bits
net_pl = obj.max_prefixlen - obj.page_bits if pages > 1 else obj.prefixlen
network = ipaddress.ip_network(u'{0}/{1}'.format(net_ip, net_pl))
# This method used to return _BaseNetwork.hosts() generator,
# however the default implementation would skip network and
# broadcast addresses, which made the final IP pool short of
# two IP addresses. As of AC-3531 fix, now it returns
# the _BaseNetwork.__iter__() instead, which does not skip any addresses.
return iter(network)
def to_dict(self, include=None, exclude=None, page=None):
free_hosts_and_busy = self.free_hosts_and_busy(page=page)
pages = ip_network(self.network).pages()
page = _page(page, pages)
data = dict(
id=self.network,
network=self.network,
ipv6=self.ipv6,
free_hosts=self.free_hosts(page=page),
blocked_list=list(self.get_blocked_set()),
node=None if self.node is None else self.node.hostname,
allocation=free_hosts_and_busy,
page=page,
pages=pages,
)
return data
def init():
config = read_config_ini('/run/flannel/subnet.env')
config_network = config['flannel_network']
config_subnet = config['flannel_subnet']
_update_ipset('kuberdock_flannel', [config_network], set_type='hash:net')
_update_ipset('kuberdock_nodes', [])
_update_ipset('kuberdock_cluster',
['kuberdock_flannel', 'kuberdock_nodes'],
set_type='list:set')
network = ipaddress.ip_network(unicode(config_network))
subnet = ipaddress.ip_network(unicode(config_subnet), strict=False)
etcd = ETCD()
for user in etcd.users():
for pod in etcd.pods(user):
pod_ip = ipaddress.ip_address(pod)
if pod_ip not in network or pod_ip in subnet:
etcd.delete_user(user, pod)
update_ipset()
def _ip_is_system_blacklisted(self, addr):
for net in self.env.get_config('apiserver.whitelisted_ips'):
try:
net = ip_network(net, strict=False)
except ValueError:
continue
if addr in net:
return False
for net in self.env.get_config('apiserver.blacklisted_ips'):
try:
net = ip_network(net, strict=False)
except ValueError:
continue
if addr in net:
return True
return False
def test_cidr_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::cidr")
self.assertTrue(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.0/24'::cidr")
obj = cur.fetchone()[0]
self.assertTrue(isinstance(obj, ip.IPv4Network), repr(obj))
self.assertEqual(obj, ip.ip_network('127.0.0.0/24'))
cur.execute("select '::ffff:102:300/128'::cidr")
obj = cur.fetchone()[0]
self.assertTrue(isinstance(obj, ip.IPv6Network), repr(obj))
self.assertEqual(obj, ip.ip_network('::ffff:102:300/128'))
def test_cidr_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::cidr")
self.assert_(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.0/24'::cidr")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv4Network), repr(obj))
self.assertEquals(obj, ip.ip_network('127.0.0.0/24'))
cur.execute("select '::ffff:102:300/128'::cidr")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv6Network), repr(obj))
self.assertEquals(obj, ip.ip_network('::ffff:102:300/128'))
def process_acquire(self, xfrm_acquire, xfrm_tmpl):
small_tsi = TrafficSelector.from_network(ip_network(xfrm_acquire.sel.saddr.to_ipaddr()),
xfrm_acquire.sel.sport, xfrm_acquire.sel.proto)
small_tsr = TrafficSelector.from_network(ip_network(xfrm_acquire.sel.daddr.to_ipaddr()),
xfrm_acquire.sel.dport, xfrm_acquire.sel.proto)
acquire = Acquire(small_tsi, small_tsr, xfrm_acquire.policy.index)
request = None
if self.state == IkeSa.State.INITIAL:
request = self.generate_ike_sa_init_request(acquire)
elif self.state == IkeSa.State.ESTABLISHED:
request = self.generate_create_child_sa_request(acquire)
else:
logging.warning('Cannot process acquire while waiting for a response.')
# TODO: Use a request queue for simplicity and to avoid problems with
# state machine
if request:
request_data = request.to_bytes()
self.log_message(request, self.peer_addr, request_data, send=True)
return request_data
else:
return None
def create_policies(self, my_addr, peer_addr, ike_conf):
for ipsec_conf in ike_conf['protect']:
if ipsec_conf['mode'] == Mode.TUNNEL:
src_selector = ipsec_conf['my_subnet']
dst_selector = ipsec_conf['peer_subnet']
else:
src_selector = ip_network(my_addr)
dst_selector = ip_network(peer_addr)
# generate an index for outbound policies
index = SystemRandom().randint(0, 10000) << 2 | XFRM_POLICY_OUT
ipsec_conf['index'] = index
self._create_policy(src_selector, dst_selector, ipsec_conf['my_port'],
ipsec_conf['peer_port'], ipsec_conf['ip_proto'], XFRM_POLICY_OUT,
ipsec_conf['ipsec_proto'], ipsec_conf['mode'], my_addr, peer_addr,
index=index)
self._create_policy(dst_selector, src_selector, ipsec_conf['peer_port'],
ipsec_conf['my_port'], ipsec_conf['ip_proto'], XFRM_POLICY_IN,
ipsec_conf['ipsec_proto'], ipsec_conf['mode'], peer_addr, my_addr)
self._create_policy(dst_selector, src_selector,
ipsec_conf['peer_port'], ipsec_conf['my_port'],
ipsec_conf['ip_proto'], XFRM_POLICY_FWD, ipsec_conf['ipsec_proto'],
ipsec_conf['mode'], peer_addr, my_addr)
def web_data_admin(self):
"""Return dict used in admin/DC web templates"""
return {
'name': self.name,
'alias': self.alias,
'access': self.access,
'owner': self.owner.username,
'desc': self.desc,
'ip_network': str(self.ip_network),
'network': self.network,
'netmask': self.netmask,
'gateway': self.gateway,
'nic_tag': self.nic_tag,
'nic_tag_type': self.nic_tag_type,
'vlan_id': self.vlan_id,
'vxlan_id': self.vxlan_id,
'mtu': self.mtu,
'resolvers': self.get_resolvers(),
'dns_domain': self.dns_domain,
'ptr_domain': self.ptr_domain,
'dc_bound': self.dc_bound_bool,
'dhcp_passthrough': self.dhcp_passthrough,
}
def test_incompatible_versions(self):
# These should always raise TypeError
v4addr = ipaddress.ip_address('1.1.1.1')
v4net = ipaddress.ip_network('1.1.1.1')
v6addr = ipaddress.ip_address('::1')
v6net = ipaddress.ip_address('::1')
self.assertRaises(TypeError, v4addr.__lt__, v6addr)
self.assertRaises(TypeError, v4addr.__gt__, v6addr)
self.assertRaises(TypeError, v4net.__lt__, v6net)
self.assertRaises(TypeError, v4net.__gt__, v6net)
self.assertRaises(TypeError, v6addr.__lt__, v4addr)
self.assertRaises(TypeError, v6addr.__gt__, v4addr)
self.assertRaises(TypeError, v6net.__lt__, v4net)
self.assertRaises(TypeError, v6net.__gt__, v4net)
def testIpFromInt(self):
self.assertEqual(self.ipv4_interface._ip,
ipaddress.IPv4Interface(16909060)._ip)
ipv4 = ipaddress.ip_network('1.2.3.4')
ipv6 = ipaddress.ip_network('2001:658:22a:cafe:200:0:0:1')
self.assertEqual(ipv4, ipaddress.ip_network(int(ipv4.network_address)))
self.assertEqual(ipv6, ipaddress.ip_network(int(ipv6.network_address)))
v6_int = 42540616829182469433547762482097946625
self.assertEqual(self.ipv6_interface._ip,
ipaddress.IPv6Interface(v6_int)._ip)
self.assertEqual(ipaddress.ip_network(self.ipv4_address._ip).version,
4)
self.assertEqual(ipaddress.ip_network(self.ipv6_address._ip).version,
6)
def testHash(self):
self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')),
hash(ipaddress.ip_interface('10.1.1.0/24')))
self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')),
hash(ipaddress.ip_network('10.1.1.0/24')))
self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')),
hash(ipaddress.ip_address('10.1.1.0')))
# i70
self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')),
hash(ipaddress.ip_address(
int(ipaddress.ip_address('1.2.3.4')._ip))))
ip1 = ipaddress.ip_address('10.1.1.0')
ip2 = ipaddress.ip_address('1::')
dummy = {}
dummy[self.ipv4_address] = None
dummy[self.ipv6_address] = None
dummy[ip1] = None
dummy[ip2] = None
self.assertTrue(self.ipv4_address in dummy)
self.assertTrue(ip2 in dummy)
def testInetRoundtrip(self):
try:
import ipaddress
v = ipaddress.ip_network('192.168.0.0/28')
self.cursor.execute("select %s as f1", (v,))
retval = self.cursor.fetchall()
self.assertEqual(retval[0][0], v)
v = ipaddress.ip_address('192.168.0.1')
self.cursor.execute("select %s as f1", (v,))
retval = self.cursor.fetchall()
self.assertEqual(retval[0][0], v)
except ImportError:
for v in ('192.168.100.128/25', '192.168.0.1'):
self.cursor.execute(
"select cast(cast(%s as varchar) as inet) as f1", (v,))
retval = self.cursor.fetchall()
self.assertEqual(retval[0][0], v)
def find_cidr(ip):
"""Finds most specific CIDR in Networks"""
# Check for non-ip, try DNS
if not re.search(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', ip):
try:
ip = socket.gethostbyname(ip)
except socket.gaierror:
raise Exception("Hostname Lookup Failure on: " + ip)
# Always start with default route
mostSpecific = "0.0.0.0/0"
# All networks
networks = nglib.py2neo_ses.cypher.execute('MATCH (n:Network) RETURN n.cidr as cidr')
if len(networks) > 1:
for r in networks.records:
if ipaddress.ip_address(ip) in ipaddress.ip_network(r.cidr):
if nglib.verbose > 1:
print("find_cidr", ip + " in " + r.cidr)
mostSpecific = compare_cidr(mostSpecific, r.cidr)
return mostSpecific
def test_incompatible_versions(self):
# These should always raise TypeError
v4addr = ipaddress.ip_address('1.1.1.1')
v4net = ipaddress.ip_network('1.1.1.1')
v6addr = ipaddress.ip_address('::1')
v6net = ipaddress.ip_network('::1')
self.assertRaises(TypeError, v4addr.__lt__, v6addr)
self.assertRaises(TypeError, v4addr.__gt__, v6addr)
self.assertRaises(TypeError, v4net.__lt__, v6net)
self.assertRaises(TypeError, v4net.__gt__, v6net)
self.assertRaises(TypeError, v6addr.__lt__, v4addr)
self.assertRaises(TypeError, v6addr.__gt__, v4addr)
self.assertRaises(TypeError, v6net.__lt__, v4net)
self.assertRaises(TypeError, v6net.__gt__, v4net)
def testIpFromInt(self):
self.assertEqual(self.ipv4_interface._ip,
ipaddress.IPv4Interface(16909060)._ip)
ipv4 = ipaddress.ip_network('1.2.3.4')
ipv6 = ipaddress.ip_network('2001:658:22a:cafe:200:0:0:1')
self.assertEqual(ipv4, ipaddress.ip_network(int(ipv4.network_address)))
self.assertEqual(ipv6, ipaddress.ip_network(int(ipv6.network_address)))
v6_int = 42540616829182469433547762482097946625
self.assertEqual(self.ipv6_interface._ip,
ipaddress.IPv6Interface(v6_int)._ip)
self.assertEqual(ipaddress.ip_network(self.ipv4_address._ip).version,
4)
self.assertEqual(ipaddress.ip_network(self.ipv6_address._ip).version,
6)
def testHash(self):
self.assertEqual(hash(ipaddress.ip_interface('10.1.1.0/24')),
hash(ipaddress.ip_interface('10.1.1.0/24')))
self.assertEqual(hash(ipaddress.ip_network('10.1.1.0/24')),
hash(ipaddress.ip_network('10.1.1.0/24')))
self.assertEqual(hash(ipaddress.ip_address('10.1.1.0')),
hash(ipaddress.ip_address('10.1.1.0')))
# i70
self.assertEqual(hash(ipaddress.ip_address('1.2.3.4')),
hash(ipaddress.ip_address(
int(ipaddress.ip_address('1.2.3.4')._ip))))
ip1 = ipaddress.ip_address('10.1.1.0')
ip2 = ipaddress.ip_address('1::')
dummy = {}
dummy[self.ipv4_address] = None
dummy[self.ipv6_address] = None
dummy[ip1] = None
dummy[ip2] = None
self.assertIn(self.ipv4_address, dummy)
self.assertIn(ip2, dummy)
def __init__(self, onosIps, num=1, numBgpSpeakers=1, asNum=65000, externalOnos=True,
peerIntfConfig=None, withFpm=False):
super(SdnAutonomousSystem, self).__init__(asNum, numBgpSpeakers)
self.onosIps = onosIps
self.num = num
self.numBgpSpeakers = numBgpSpeakers
self.peerIntfConfig = peerIntfConfig
self.withFpm = withFpm
self.externalOnos= externalOnos
self.internalPeeringSubnet = ip_network(u'1.1.1.0/24')
for router in self.routers.values():
# Add iBGP sessions to ONOS nodes
for onosIp in onosIps:
router.neighbors.append({'address':onosIp, 'as':asNum, 'port':2000})
# Add iBGP sessions to other BGP speakers
for i, router2 in self.routers.items():
if router == router2:
continue
cpIpBase = self.num*10
ip = AutonomousSystem.getIthAddress(self.internalPeeringSubnet, cpIpBase+i)
router.neighbors.append({'address':ip.ip, 'as':asNum})
def generateRoutes(baseRange, numRoutes, subnetSize=None):
baseNetwork = ip_network(baseRange)
# We need to get at least 2 addresses out of each subnet, so the biggest
# prefix length we can have is /30
maxPrefixLength = baseNetwork.max_prefixlen - 2
if subnetSize is not None:
return list(baseNetwork.subnets(new_prefix=subnetSize))
trySubnetSize = baseNetwork.prefixlen + 1
while trySubnetSize <= maxPrefixLength and \
len(list(baseNetwork.subnets(new_prefix=trySubnetSize))) < numRoutes:
trySubnetSize += 1
if trySubnetSize > maxPrefixLength:
raise Exception("Can't get enough routes from input parameters")
return list(baseNetwork.subnets(new_prefix=trySubnetSize))[:numRoutes]
def __init__(self, file_path):
self._ipv4_lpm_dict = LpmDict()
for ip in EXCLUDE_IPV4_PREFIXES:
self._ipv4_lpm_dict[ip] = self.NextHop()
self._ipv6_lpm_dict = LpmDict(ipv4=False)
for ip in EXCLUDE_IPV6_PREFIXES:
self._ipv6_lpm_dict[ip] = self.NextHop()
# filter out empty lines and lines starting with '#'
pattern = re.compile("^#.*$|^[ \t]*$")
with open(file_path, 'r') as f:
for line in f.readlines():
if pattern.match(line): continue
entry = line.split(' ', 1)
prefix = ip_network(unicode(entry[0]))
next_hop = self.NextHop(entry[1])
if prefix.version is 4:
self._ipv4_lpm_dict[str(prefix)] = next_hop
elif prefix.version is 6:
self._ipv6_lpm_dict[str(prefix)] = next_hop