def __init__(self, listen_info, handle=None, backlog=None,
spawn='default', **ssl_args):
assert backlog is None
assert spawn == 'default'
if netaddr.valid_ipv6(listen_info[0]):
self.server = eventlet.listen(listen_info,
family=socket.AF_INET6)
elif os.path.isdir(os.path.dirname(listen_info[0])):
# Case for Unix domain socket
self.server = eventlet.listen(listen_info[0],
family=socket.AF_UNIX)
else:
self.server = eventlet.listen(listen_info)
if ssl_args:
def wrap_and_handle(sock, addr):
ssl_args.setdefault('server_side', True)
handle(ssl.wrap_socket(sock, **ssl_args), addr)
self.handle = wrap_and_handle
else:
self.handle = handle
python类valid_ipv6()的实例源码
def serialize(self):
if netaddr.valid_ipv6(self.ip_addr):
# Sets Peer IP Address family bit to IPv6
self.type |= self.IP_ADDR_FAMILY_BIT
ip_addr = ip.text_to_bin(self.ip_addr)
if self.type & self.AS_NUMBER_SIZE_BIT or self.as_num > 0xffff:
# Four octet AS number
self.type |= self.AS_NUMBER_SIZE_BIT
as_num = struct.pack('!I', self.as_num)
else:
# Two octet AS number
as_num = struct.pack('!H', self.as_num)
buf = struct.pack(self._HEADER_FMT,
self.type,
addrconv.ipv4.text_to_bin(self.bgp_id))
return buf + ip_addr + as_num
def serialize(self):
# fixup
if (netaddr.valid_ipv4(self.peer_ip)
and netaddr.valid_ipv4(self.local_ip)):
self.afi = self.AFI_IPv4
elif (netaddr.valid_ipv6(self.peer_ip)
and netaddr.valid_ipv6(self.local_ip)):
self.afi = self.AFI_IPv6
else:
raise ValueError(
'peer_ip and local_ip must be the same address family: '
'peer_ip=%s, local_ip=%s' % (self.peer_ip, self.local_ip))
buf = struct.pack(self._HEADER_FMT,
self.peer_as, self.local_as,
self.if_index, self.afi)
buf += ip.text_to_bin(self.peer_ip)
buf += ip.text_to_bin(self.local_ip)
buf += struct.pack(self._STATES_FMT,
self.old_state, self.new_state)
return buf
def serialize(self):
# fixup
if (netaddr.valid_ipv4(self.peer_ip)
and netaddr.valid_ipv4(self.local_ip)):
self.afi = self.AFI_IPv4
elif (netaddr.valid_ipv6(self.peer_ip)
and netaddr.valid_ipv6(self.local_ip)):
self.afi = self.AFI_IPv6
else:
raise ValueError(
'peer_ip and local_ip must be the same address family: '
'peer_ip=%s, local_ip=%s' % (self.peer_ip, self.local_ip))
buf = struct.pack(self._HEADER_FMT,
self.peer_as, self.local_as,
self.if_index, self.afi)
buf += ip.text_to_bin(self.peer_ip)
buf += ip.text_to_bin(self.local_ip)
buf += self.bgp_message.serialize()
return buf
def serialize(self):
if ip.valid_ipv4(self.prefix):
self.family = socket.AF_INET # fixup
prefix_addr, prefix_num = self.prefix.split('/')
body_bin = struct.pack(
self._IPV4_BODY_FMT,
addrconv.ipv4.text_to_bin(prefix_addr),
int(prefix_num))
elif ip.valid_ipv6(self.prefix):
self.family = socket.AF_INET6 # fixup
prefix_addr, prefix_num = self.prefix.split('/')
body_bin = struct.pack(
self._IPV6_BODY_FMT,
addrconv.ipv6.text_to_bin(prefix_addr),
int(prefix_num))
else:
raise ValueError('Invalid prefix: %s' % self.prefix)
return struct.pack(self._FAMILY_FMT, self.family) + body_bin
def serialize(self):
# fixup
if ip.valid_ipv4(self.prefix):
self.family = socket.AF_INET
elif ip.valid_ipv6(self.prefix):
self.family = socket.AF_INET6
else:
raise ValueError('Invalid prefix: %s' % self.prefix)
buf = struct.pack(self._FAMILY_FMT, self.family)
buf += _serialize_ip_prefix(self.prefix)
buf += struct.pack(self._METRIC_FMT, self.metric)
return buf + _serialize_nexthops(self.nexthops)
def validate(value):
if isinstance(value, six.string_types) and netaddr.valid_ipv6(value):
return value
raise ValueError(_("Expected IPv6 String, got '%s'") % value)
def is_valid_ipv6(address):
"""Verify that address represents a valid IPv6 address.
:param address: Value to verify
:type address: string
:returns: bool
.. versionadded:: 1.1
"""
try:
return netaddr.valid_ipv6(address)
except netaddr.AddrFormatError:
return False
def build_base_url(protocol, host, port):
proto_str = '%s://' % protocol
host_str = '[%s]' % host if netaddr.valid_ipv6(host) else host
port_str = '' if port is None else ':%d' % port
return proto_str + host_str + port_str
def netlist(nets):
v4nets = []
v6nets = []
for net in nets:
ipNetwork = netaddr.IPNetwork(net)
parts = str(ipNetwork).split("/")
ip = parts[0]
mask = parts[1]
if netaddr.valid_ipv4(ip) and int(mask) <= 32:
v4nets.append(ipNetwork)
elif netaddr.valid_ipv6(ip) and int(mask) <= 128:
v6nets.append(ipNetwork)
return v4nets, v6nets
def ipset(nets):
v4nets = netaddr.IPSet()
v6nets = netaddr.IPSet()
for net in nets:
ipNetwork = netaddr.IPNetwork(net)
parts = str(ipNetwork).split("/")
ip = parts[0]
mask = parts[1]
if netaddr.valid_ipv4(ip) and int(mask) <= 32:
v4nets.add(ipNetwork)
elif netaddr.valid_ipv6(ip) and int(mask) <= 128:
v6nets.add(ipNetwork)
return v4nets, v6nets
def netlist(nets):
v4nets = []
v6nets = []
for net in nets:
ipNetwork = netaddr.IPNetwork(net)
parts = str(ipNetwork).split("/")
ip = parts[0]
mask = parts[1]
if netaddr.valid_ipv4(ip) and int(mask) <= 32:
v4nets.append(ipNetwork)
elif netaddr.valid_ipv6(ip) and int(mask) <= 128:
v6nets.append(ipNetwork)
return v4nets, v6nets
def valid_ip_address(addr):
if not netaddr.valid_ipv4(addr) and not netaddr.valid_ipv6(addr):
return False
return True
def _check_rf_and_normalize(prefix):
""" check prefix's route_family and if the address is
IPv6 address, return IPv6 route_family and normalized IPv6 address.
If the address is IPv4 address, return IPv4 route_family
and the prefix itself.
"""
ip, masklen = prefix.split('/')
if netaddr.valid_ipv6(ip):
# normalize IPv6 address
ipv6_prefix = str(netaddr.IPAddress(ip)) + '/' + masklen
return vrfs.VRF_RF_IPV6, ipv6_prefix
else:
return vrfs.VRF_RF_IPV4, prefix
def validate_ip_addr(ip_addr):
if netaddr.valid_ipv4(ip_addr):
return lib_consts.IP_VERSION_4
elif netaddr.valid_ipv6(ip_addr):
return lib_consts.IP_VERSION_6
else:
raise bgp_driver_exc.InvalidParamType(param=ip_addr,
param_type='ip-address')
def is_ip(self, ip_addr=None):
"""
Return true if valid IP address return false if invalid IP address
:param ip_addr: optional IP to pass. Takes from root class if not specified
>>> from ipinformation import IPInformation
>>> print IPInformation(ip_address='8.8.8.8').is_ip()
True
>>> print IPInformation(ip_address='NotAnIP').is_ip()
False
"""
if not ip_addr:
ip_addr = self.ip_address
valid = True
if netaddr.valid_ipv4( ip_addr ): #IPv4 Address
if not re.match( valid_ip_regex, ip_addr ):
valid = False
elif netaddr.valid_ipv6( ip_addr ):
pass
else:
# print '"%s" is not a valid IP Address.' %ip_addr
valid = False
return valid
def validate_ip_addr(addr, version=None):
"""
Validates that an IP address is valid. Returns true if valid, false if
not. Version can be "4", "6", None for "IPv4", "IPv6", or "either"
respectively.
"""
if version == 4:
return netaddr.valid_ipv4(addr)
elif version == 6:
return netaddr.valid_ipv6(addr)
else:
return netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr)
def validate_ip_addr(addr, version=None):
"""
Validates that an IP address is valid. Returns true if valid, false if
not. Version can be "4", "6", None for "IPv4", "IPv6", or "either"
respectively.
"""
if version == 4:
return netaddr.valid_ipv4(addr)
elif version == 6:
return netaddr.valid_ipv6(addr)
else:
return netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr)
def valid_ipv6(addr, flags=0):
"""
Wrapper function of "netaddr.valid_ipv6()".
The function extends "netaddr.valid_ipv6()" to enable to validate
IPv4 network address in "xxxx:xxxx:xxxx::/xx" format.
:param addr: IP address to be validated.
:param flags: See the "netaddr.valid_ipv6()" docs for details.
:return: True is valid. False otherwise.
"""
return _valid_ip(netaddr.valid_ipv6, 128, addr, flags)
def next_hop(self, addr):
if not netaddr.valid_ipv4(addr) and not netaddr.valid_ipv6(addr):
raise ValueError('Invalid address for next_hop: %s' % addr)
self._next_hop = addr
self.next_hop_list[0] = addr
def next_hop_list(self, addr_list):
if not isinstance(addr_list, (list, tuple)):
addr_list = [addr_list]
for addr in addr_list:
if not netaddr.valid_ipv4(addr) and not netaddr.valid_ipv6(addr):
raise ValueError('Invalid address for next_hop: %s' % addr)
self._next_hop = addr_list[0]
self._next_hop_list = addr_list
def _serialize_ip_prefix(prefix):
if ip.valid_ipv4(prefix):
prefix_addr, prefix_num = prefix.split('/')
return bgp.IPAddrPrefix(int(prefix_num), prefix_addr).serialize()
elif ip.valid_ipv6(prefix):
prefix_addr, prefix_num = prefix.split('/')
return IPv6Prefix(int(prefix_num), prefix_addr).serialize()
else:
raise ValueError('Invalid prefix: %s' % prefix)
def __init__(self, ifindex, ifc_flags, family, prefix, dest):
super(_ZebraInterfaceAddress, self).__init__()
self.ifindex = ifindex
self.ifc_flags = ifc_flags
self.family = family
if isinstance(prefix, (IPv4Prefix, IPv6Prefix)):
prefix = prefix.prefix
self.prefix = prefix
assert netaddr.valid_ipv4(dest) or netaddr.valid_ipv6(dest)
self.dest = dest
def serialize(self):
if ip.valid_ipv4(self.prefix):
self.family = socket.AF_INET # fixup
prefix_addr, prefix_num = self.prefix.split('/')
body_bin = struct.pack(
self._IPV4_BODY_FMT,
addrconv.ipv4.text_to_bin(prefix_addr),
int(prefix_num),
addrconv.ipv4.text_to_bin(self.dest))
elif ip.valid_ipv6(self.prefix):
self.family = socket.AF_INET6 # fixup
prefix_addr, prefix_num = self.prefix.split('/')
body_bin = struct.pack(
self._IPV6_BODY_FMT,
addrconv.ipv6.text_to_bin(prefix_addr),
int(prefix_num),
addrconv.ipv6.text_to_bin(self.dest))
else:
raise ValueError(
'Invalid address family for prefix=%s and dest=%s'
% (self.prefix, self.dest))
buf = struct.pack(self._HEADER_FMT,
self.ifindex, self.ifc_flags, self.family)
return buf + body_bin
def __init__(self, addr, metric=None, nexthops=None):
super(_ZebraIPNexthopLookup, self).__init__()
assert netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr)
self.addr = addr
self.metric = metric
nexthops = nexthops or []
for nexthop in nexthops:
assert isinstance(nexthop, _NextHop)
self.nexthops = nexthops
def __init__(self, prefix, metric=None, nexthops=None):
super(_ZebraIPImportLookup, self).__init__()
assert netaddr.valid_ipv4(prefix) or netaddr.valid_ipv6(prefix)
self.prefix = prefix
self.metric = metric
nexthops = nexthops or []
for nexthop in nexthops:
assert isinstance(nexthop, _NextHop)
self.nexthops = nexthops
def __init__(self, addr, distance, metric, nexthops=None):
super(_ZebraIPNexthopLookupMRib, self).__init__()
assert netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr)
self.addr = addr
self.distance = distance
self.metric = metric
nexthops = nexthops or []
for nexthop in nexthops:
assert isinstance(nexthop, _NextHop)
self.nexthops = nexthops
def _check_rf_and_normalize(prefix):
""" check prefix's route_family and if the address is
IPv6 address, return IPv6 route_family and normalized IPv6 address.
If the address is IPv4 address, return IPv4 route_family
and the prefix itself.
"""
ip, masklen = prefix.split('/')
if netaddr.valid_ipv6(ip):
# normalize IPv6 address
ipv6_prefix = str(netaddr.IPAddress(ip)) + '/' + masklen
return vrfs.VRF_RF_IPV6, ipv6_prefix
else:
return vrfs.VRF_RF_IPV4, prefix
def is_valid_ipv6(ipv6):
"""Returns True if given `ipv6` is a valid IPv6 address
"""
return netaddr.valid_ipv6(ipv6)
def detect_address_family(host):
if netaddr.valid_ipv4(host):
return socket.AF_INET
elif netaddr.valid_ipv6(host):
return socket.AF_INET6
elif os.path.isdir(os.path.dirname(host)):
return socket.AF_UNIX
else:
return None