def get_ipv6_addr_by_EUI64(cidr, mac):
"""Generate a IPv6 addr by EUI-64 with CIDR and MAC
:param str cidr: a IPv6 CIDR
:param str mac: a MAC address
:return: an IPv6 Address
:rtype: netaddr.IPAddress
"""
# Check if the prefix is IPv4 address
is_ipv4 = netaddr.valid_ipv4(cidr)
if is_ipv4:
msg = "Unable to generate IP address by EUI64 for IPv4 prefix"
raise TypeError(msg)
try:
eui64 = int(netaddr.EUI(mac).eui64())
prefix = netaddr.IPNetwork(cidr)
return netaddr.IPAddress(prefix.first + eui64 ^ (1 << 57))
except (ValueError, netaddr.AddrFormatError):
raise TypeError('Bad prefix or mac format for generating IPv6 '
'address by EUI-64: %(prefix)s, %(mac)s:'
% {'prefix': cidr, 'mac': mac})
except TypeError:
raise TypeError('Bad prefix type for generate IPv6 address by '
'EUI-64: %s' % cidr)
python类valid_ipv4()的实例源码
def serialize(self):
# fixup
if (netaddr.valid_ipv4(self.peer_ip)
and netaddr.valid_ipv4(self.local_ip)):
self.afi = self.AFI_IPv4
elif (netaddr.valid_ipv6(self.peer_ip)
and netaddr.valid_ipv6(self.local_ip)):
self.afi = self.AFI_IPv6
else:
raise ValueError(
'peer_ip and local_ip must be the same address family: '
'peer_ip=%s, local_ip=%s' % (self.peer_ip, self.local_ip))
buf = struct.pack(self._HEADER_FMT,
self.peer_as, self.local_as,
self.if_index, self.afi)
buf += ip.text_to_bin(self.peer_ip)
buf += ip.text_to_bin(self.local_ip)
buf += struct.pack(self._STATES_FMT,
self.old_state, self.new_state)
return buf
def serialize(self):
# fixup
if (netaddr.valid_ipv4(self.peer_ip)
and netaddr.valid_ipv4(self.local_ip)):
self.afi = self.AFI_IPv4
elif (netaddr.valid_ipv6(self.peer_ip)
and netaddr.valid_ipv6(self.local_ip)):
self.afi = self.AFI_IPv6
else:
raise ValueError(
'peer_ip and local_ip must be the same address family: '
'peer_ip=%s, local_ip=%s' % (self.peer_ip, self.local_ip))
buf = struct.pack(self._HEADER_FMT,
self.peer_as, self.local_as,
self.if_index, self.afi)
buf += ip.text_to_bin(self.peer_ip)
buf += ip.text_to_bin(self.local_ip)
buf += self.bgp_message.serialize()
return buf
def __init__(self, afi, safi, next_hop, nlri,
flags=0, type_=None, length=None):
super(BGPPathAttributeMpReachNLRI, self).__init__(
flags=flags, type_=type_, length=length)
self.afi = afi
self.safi = safi
if not isinstance(next_hop, (list, tuple)):
next_hop = [next_hop]
for n in next_hop:
if not netaddr.valid_ipv4(n) and not netaddr.valid_ipv6(n):
raise ValueError('Invalid address for next_hop: %s' % n)
# Note: For the backward compatibility, stores the first next_hop
# address and all next_hop addresses separately.
if next_hop:
self._next_hop = next_hop[0]
else:
self._next_hop = None
self._next_hop_list = next_hop
self.nlri = nlri
addr_cls = _get_addr_class(afi, safi)
for i in nlri:
if not isinstance(i, addr_cls):
raise ValueError('Invalid NRLI class for afi=%d and safi=%d'
% (self.afi, self.safi))
def __init__(self, lp_status, te_metric, max_bw, max_reserved_bw,
unreserved_bw, admin_group, remote_as, remote_ip,
average_delay, min_delay, max_delay, delay_var, pkt_loss,
residual_bw, average_bw, utilized_bw):
super(InterfaceLinkParams, self).__init__()
self.lp_status = lp_status
self.te_metric = te_metric
self.max_bw = max_bw
self.max_reserved_bw = max_reserved_bw
assert isinstance(unreserved_bw, (list, tuple))
assert len(unreserved_bw) == MAX_CLASS_TYPE
self.unreserved_bw = unreserved_bw
self.admin_group = admin_group
self.remote_as = remote_as
assert netaddr.valid_ipv4(remote_ip)
self.remote_ip = remote_ip
self.average_delay = average_delay
self.min_delay = min_delay
self.max_delay = max_delay
self.delay_var = delay_var
self.pkt_loss = pkt_loss
self.residual_bw = residual_bw
self.average_bw = average_bw
self.utilized_bw = utilized_bw
def serialize(self):
if ip.valid_ipv4(self.prefix):
self.family = socket.AF_INET # fixup
prefix_addr, prefix_num = self.prefix.split('/')
body_bin = struct.pack(
self._IPV4_BODY_FMT,
addrconv.ipv4.text_to_bin(prefix_addr),
int(prefix_num))
elif ip.valid_ipv6(self.prefix):
self.family = socket.AF_INET6 # fixup
prefix_addr, prefix_num = self.prefix.split('/')
body_bin = struct.pack(
self._IPV6_BODY_FMT,
addrconv.ipv6.text_to_bin(prefix_addr),
int(prefix_num))
else:
raise ValueError('Invalid prefix: %s' % self.prefix)
return struct.pack(self._FAMILY_FMT, self.family) + body_bin
def serialize(self):
# fixup
if ip.valid_ipv4(self.prefix):
self.family = socket.AF_INET
elif ip.valid_ipv6(self.prefix):
self.family = socket.AF_INET6
else:
raise ValueError('Invalid prefix: %s' % self.prefix)
buf = struct.pack(self._FAMILY_FMT, self.family)
buf += _serialize_ip_prefix(self.prefix)
buf += struct.pack(self._METRIC_FMT, self.metric)
return buf + _serialize_nexthops(self.nexthops)
def validate(value):
if isinstance(value, six.string_types) and netaddr.valid_ipv4(value):
return value
raise ValueError(_("Expected IPv4 string, got '%s'") % value)
def validIP(self, address):
return netaddr.valid_ipv4(address)
# Clean DNS results to be a simple list
def matches(self, value, includeParents=False, includeChildren=True):
value = value.lower()
if value is None or value == "":
return False
if netaddr.valid_ipv4(value):
# 1.1
if value in self.getAddresses():
return True
# 1.2
if self.targetType == "NETBLOCK_OWNER":
if netaddr.IPAddress(value) in netaddr.IPNetwork(self.targetValue):
return True
if self.targetType == "IP_ADDRESS":
if netaddr.IPAddress(value) in \
netaddr.IPNetwork(netaddr.IPAddress(self.targetValue)):
return True
else:
for name in self.getNames():
# 2.1
if value == name:
return True
# 2.2
if includeParents and name.endswith("." + value):
return True
# 2.3
if includeChildren and value.endswith("." + name):
return True
return None
# Class for SpiderFoot Events
def validIP(self, address):
return netaddr.valid_ipv4(address)
# Clean DNS results to be a simple list
def matches(self, value, includeParents=False, includeChildren=True):
value = value.lower()
if value is None or value == "":
return False
if netaddr.valid_ipv4(value):
# 1.1
if value in self.getAddresses():
return True
# 1.2
if self.targetType == "NETBLOCK_OWNER":
if netaddr.IPAddress(value) in netaddr.IPNetwork(self.targetValue):
return True
if self.targetType == "IP_ADDRESS":
if netaddr.IPAddress(value) in \
netaddr.IPNetwork(netaddr.IPAddress(self.targetValue)):
return True
else:
for name in self.getNames():
# 2.1
if value == name:
return True
# 2.2
if includeParents and name.endswith("." + value):
return True
# 2.3
if includeChildren and value.endswith("." + name):
return True
return None
# Class for SpiderFoot Events
def is_valid_ipv4(address):
"""Verify that address represents a valid IPv4 address.
:param address: Value to verify
:type address: string
:returns: bool
.. versionadded:: 1.1
"""
try:
return netaddr.valid_ipv4(address)
except netaddr.AddrFormatError:
return False
def get_server_port_map(self, server, ip_addr=None):
ports = self.handle._list_ports(device_id=server['id'], fixed_ip=ip_addr) # pylint: disable=protected-access
port_map = [
(p['id'], fxip['ip_address'])
for p in ports for fxip in p['fixed_ips']
if netaddr.valid_ipv4(fxip['ip_address']) and fxip['ip_address'] == ip_addr['addr']
]
return port_map
def netlist(nets):
v4nets = []
v6nets = []
for net in nets:
ipNetwork = netaddr.IPNetwork(net)
parts = str(ipNetwork).split("/")
ip = parts[0]
mask = parts[1]
if netaddr.valid_ipv4(ip) and int(mask) <= 32:
v4nets.append(ipNetwork)
elif netaddr.valid_ipv6(ip) and int(mask) <= 128:
v6nets.append(ipNetwork)
return v4nets, v6nets
def ipset(nets):
v4nets = netaddr.IPSet()
v6nets = netaddr.IPSet()
for net in nets:
ipNetwork = netaddr.IPNetwork(net)
parts = str(ipNetwork).split("/")
ip = parts[0]
mask = parts[1]
if netaddr.valid_ipv4(ip) and int(mask) <= 32:
v4nets.add(ipNetwork)
elif netaddr.valid_ipv6(ip) and int(mask) <= 128:
v6nets.add(ipNetwork)
return v4nets, v6nets
def netlist(nets):
v4nets = []
v6nets = []
for net in nets:
ipNetwork = netaddr.IPNetwork(net)
parts = str(ipNetwork).split("/")
ip = parts[0]
mask = parts[1]
if netaddr.valid_ipv4(ip) and int(mask) <= 32:
v4nets.append(ipNetwork)
elif netaddr.valid_ipv6(ip) and int(mask) <= 128:
v6nets.append(ipNetwork)
return v4nets, v6nets
def valid_ip_address(addr):
if not netaddr.valid_ipv4(addr) and not netaddr.valid_ipv6(addr):
return False
return True
def prefix_add(self, prefix, next_hop=None, route_dist=None):
""" This method adds a new prefix to be advertized.
``prefix`` must be the string representation of an IP network
(e.g., 10.1.1.0/24).
``next_hop`` specifies the next hop address for this
prefix. This parameter is necessary for only VPNv4 and VPNv6
address families.
``route_dist`` specifies a route distinguisher value. This
parameter is necessary for only VPNv4 and VPNv6 address
families.
"""
func_name = 'network.add'
networks = {}
networks[PREFIX] = prefix
if next_hop:
networks[NEXT_HOP] = next_hop
if route_dist:
func_name = 'prefix.add_local'
networks[ROUTE_DISTINGUISHER] = route_dist
rf, p = self._check_rf_and_normalize(prefix)
networks[ROUTE_FAMILY] = rf
networks[PREFIX] = p
if rf == vrfs.VRF_RF_IPV6 and netaddr.valid_ipv4(next_hop):
# convert the next_hop to IPv4-Mapped IPv6 Address
networks[NEXT_HOP] = \
str(netaddr.IPAddress(next_hop).ipv6())
return call(func_name, **networks)
def _connect_tcp(self, peer_addr, conn_handler, time_out=None,
bind_address=None, password=None):
"""Creates a TCP connection to given peer address.
Tries to create a socket for `timeout` number of seconds. If
successful, uses the socket instance to start `client_factory`.
The socket is bound to `bind_address` if specified.
"""
LOG.debug('Connect TCP called for %s:%s', peer_addr[0], peer_addr[1])
if netaddr.valid_ipv4(peer_addr[0]):
family = socket.AF_INET
else:
family = socket.AF_INET6
with Timeout(time_out, socket.error):
sock = socket.socket(family)
if bind_address:
sock.bind(bind_address)
if password:
sockopt.set_tcp_md5sig(sock, peer_addr[0], password)
sock.connect(peer_addr)
# socket.error exception is rasied in cese of timeout and
# the following code is executed only when the connection
# is established.
# Connection name for pro-active connection is made up of
# local end address + remote end address
local = self.get_localname(sock)[0]
remote = self.get_remotename(sock)[0]
conn_name = ('L: ' + local + ', R: ' + remote)
self._asso_socket_map[conn_name] = sock
# If connection is established, we call connection handler
# in a new thread.
self._spawn(conn_name, conn_handler, sock)
return sock
#
# Sink
#
def add_to_global_table(self, prefix, nexthop=None,
is_withdraw=False):
src_ver_num = 1
peer = None
# set mandatory path attributes
origin = BGPPathAttributeOrigin(BGP_ATTR_ORIGIN_IGP)
aspath = BGPPathAttributeAsPath([[]])
pathattrs = OrderedDict()
pathattrs[BGP_ATTR_TYPE_ORIGIN] = origin
pathattrs[BGP_ATTR_TYPE_AS_PATH] = aspath
net = netaddr.IPNetwork(prefix)
ip = str(net.ip)
masklen = net.prefixlen
if netaddr.valid_ipv4(ip):
_nlri = IPAddrPrefix(masklen, ip)
if nexthop is None:
nexthop = '0.0.0.0'
p = Ipv4Path
else:
_nlri = IP6AddrPrefix(masklen, ip)
if nexthop is None:
nexthop = '::'
p = Ipv6Path
new_path = p(peer, _nlri, src_ver_num,
pattrs=pathattrs, nexthop=nexthop,
is_withdraw=is_withdraw)
# add to global ipv4 table and propagates to neighbors
self.learn_path(new_path)
def _set_password(self, address, password):
if netaddr.valid_ipv4(address):
family = socket.AF_INET
else:
family = socket.AF_INET6
for sock in self.listen_sockets.values():
if sock.family == family:
sockopt.set_tcp_md5sig(sock, address, password)
def validate_ip_addr(ip_addr):
if netaddr.valid_ipv4(ip_addr):
return lib_consts.IP_VERSION_4
elif netaddr.valid_ipv6(ip_addr):
return lib_consts.IP_VERSION_6
else:
raise bgp_driver_exc.InvalidParamType(param=ip_addr,
param_type='ip-address')
def is_ip(self, ip_addr=None):
"""
Return true if valid IP address return false if invalid IP address
:param ip_addr: optional IP to pass. Takes from root class if not specified
>>> from ipinformation import IPInformation
>>> print IPInformation(ip_address='8.8.8.8').is_ip()
True
>>> print IPInformation(ip_address='NotAnIP').is_ip()
False
"""
if not ip_addr:
ip_addr = self.ip_address
valid = True
if netaddr.valid_ipv4( ip_addr ): #IPv4 Address
if not re.match( valid_ip_regex, ip_addr ):
valid = False
elif netaddr.valid_ipv6( ip_addr ):
pass
else:
# print '"%s" is not a valid IP Address.' %ip_addr
valid = False
return valid
def validate_ip_addr(addr, version=None):
"""
Validates that an IP address is valid. Returns true if valid, false if
not. Version can be "4", "6", None for "IPv4", "IPv6", or "either"
respectively.
"""
if version == 4:
return netaddr.valid_ipv4(addr)
elif version == 6:
return netaddr.valid_ipv6(addr)
else:
return netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr)
def validate_ip_addr(addr, version=None):
"""
Validates that an IP address is valid. Returns true if valid, false if
not. Version can be "4", "6", None for "IPv4", "IPv6", or "either"
respectively.
"""
if version == 4:
return netaddr.valid_ipv4(addr)
elif version == 6:
return netaddr.valid_ipv6(addr)
else:
return netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr)
def validIP(self, address):
return netaddr.valid_ipv4(address)
# Clean DNS results to be a simple list
def matches(self, value, includeParents=False, includeChildren=True):
value = value.lower()
if value is None or value == "":
return False
if netaddr.valid_ipv4(value):
# 1.1
if value in self.getAddresses():
return True
# 1.2
if self.targetType == "NETBLOCK_OWNER":
if netaddr.IPAddress(value) in netaddr.IPNetwork(self.targetValue):
return True
if self.targetType == "IP_ADDRESS":
if netaddr.IPAddress(value) in \
netaddr.IPNetwork(netaddr.IPAddress(self.targetValue)):
return True
else:
for name in self.getNames():
# 2.1
if value == name:
return True
# 2.2
if includeParents and name.endswith("." + value):
return True
# 2.3
if includeChildren and value.endswith("." + name):
return True
return None
# Class for SpiderFoot Events
def valid_ipv4(addr, flags=0):
"""
Wrapper function of "netaddr.valid_ipv4()".
The function extends "netaddr.valid_ipv4()" to enable to validate
IPv4 network address in "xxx.xxx.xxx.xxx/xx" format.
:param addr: IP address to be validated.
:param flags: See the "netaddr.valid_ipv4()" docs for details.
:return: True is valid. False otherwise.
"""
return _valid_ip(netaddr.valid_ipv4, 32, addr, flags)
def next_hop(self, addr):
if not netaddr.valid_ipv4(addr) and not netaddr.valid_ipv6(addr):
raise ValueError('Invalid address for next_hop: %s' % addr)
self._next_hop = addr
self.next_hop_list[0] = addr