def _serialize_ip_prefix(prefix):
if ip.valid_ipv4(prefix):
prefix_addr, prefix_num = prefix.split('/')
return bgp.IPAddrPrefix(int(prefix_num), prefix_addr).serialize()
elif ip.valid_ipv6(prefix):
prefix_addr, prefix_num = prefix.split('/')
return IPv6Prefix(int(prefix_num), prefix_addr).serialize()
else:
raise ValueError('Invalid prefix: %s' % prefix)
python类valid_ipv4()的实例源码
def __init__(self, ifindex, ifc_flags, family, prefix, dest):
super(_ZebraInterfaceAddress, self).__init__()
self.ifindex = ifindex
self.ifc_flags = ifc_flags
self.family = family
if isinstance(prefix, (IPv4Prefix, IPv6Prefix)):
prefix = prefix.prefix
self.prefix = prefix
assert netaddr.valid_ipv4(dest) or netaddr.valid_ipv6(dest)
self.dest = dest
def serialize(self):
if ip.valid_ipv4(self.prefix):
self.family = socket.AF_INET # fixup
prefix_addr, prefix_num = self.prefix.split('/')
body_bin = struct.pack(
self._IPV4_BODY_FMT,
addrconv.ipv4.text_to_bin(prefix_addr),
int(prefix_num),
addrconv.ipv4.text_to_bin(self.dest))
elif ip.valid_ipv6(self.prefix):
self.family = socket.AF_INET6 # fixup
prefix_addr, prefix_num = self.prefix.split('/')
body_bin = struct.pack(
self._IPV6_BODY_FMT,
addrconv.ipv6.text_to_bin(prefix_addr),
int(prefix_num),
addrconv.ipv6.text_to_bin(self.dest))
else:
raise ValueError(
'Invalid address family for prefix=%s and dest=%s'
% (self.prefix, self.dest))
buf = struct.pack(self._HEADER_FMT,
self.ifindex, self.ifc_flags, self.family)
return buf + body_bin
def __init__(self, prefix, metric=None, nexthops=None):
super(_ZebraIPImportLookup, self).__init__()
assert netaddr.valid_ipv4(prefix) or netaddr.valid_ipv6(prefix)
self.prefix = prefix
self.metric = metric
nexthops = nexthops or []
for nexthop in nexthops:
assert isinstance(nexthop, _NextHop)
self.nexthops = nexthops
def __init__(self, addr, distance, metric, nexthops=None):
super(_ZebraIPNexthopLookupMRib, self).__init__()
assert netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr)
self.addr = addr
self.distance = distance
self.metric = metric
nexthops = nexthops or []
for nexthop in nexthops:
assert isinstance(nexthop, _NextHop)
self.nexthops = nexthops
def valid_ip_address(addr):
if not netaddr.valid_ipv4(addr) and not netaddr.valid_ipv6(addr):
return False
return True
def is_valid_ipv4(ipv4):
"""Returns True if given is a valid ipv4 address.
Given value should be a dot-decimal notation string.
Samples:
- valid address: 10.0.0.1, 192.168.0.1
- invalid address: 11.0.0, 192:168:0:1, etc.
"""
return netaddr.valid_ipv4(ipv4)
def create_rt_extended_community(value, subtype=2):
"""
Creates an instance of the BGP Route Target Community (if "subtype=2")
or Route Origin Community ("subtype=3").
:param value: String of Route Target or Route Origin value.
:param subtype: Subtype of Extended Community.
:return: An instance of Route Target or Route Origin Community.
"""
global_admin, local_admin = value.split(':')
local_admin = int(local_admin)
if global_admin.isdigit() and 0 <= int(global_admin) <= 0xffff:
ext_com = BGPTwoOctetAsSpecificExtendedCommunity(
subtype=subtype,
as_number=int(global_admin),
local_administrator=local_admin)
elif global_admin.isdigit() and 0xffff < int(global_admin) <= 0xffffffff:
ext_com = BGPFourOctetAsSpecificExtendedCommunity(
subtype=subtype,
as_number=int(global_admin),
local_administrator=local_admin)
elif netaddr.valid_ipv4(global_admin):
ext_com = BGPIPv4AddressSpecificExtendedCommunity(
subtype=subtype,
ipv4_address=global_admin,
local_administrator=local_admin)
else:
raise ValueError(
'Invalid Route Target or Route Origin value: %s' % value)
return ext_com
def _connect_tcp(self, peer_addr, conn_handler, time_out=None,
bind_address=None, password=None):
"""Creates a TCP connection to given peer address.
Tries to create a socket for `timeout` number of seconds. If
successful, uses the socket instance to start `client_factory`.
The socket is bound to `bind_address` if specified.
"""
LOG.debug('Connect TCP called for %s:%s', peer_addr[0], peer_addr[1])
if netaddr.valid_ipv4(peer_addr[0]):
family = socket.AF_INET
else:
family = socket.AF_INET6
with Timeout(time_out, socket.error):
sock = socket.socket(family)
if bind_address:
sock.bind(bind_address)
if password:
sockopt.set_tcp_md5sig(sock, peer_addr[0], password)
sock.connect(peer_addr)
# socket.error exception is raised in case of timeout and
# the following code is executed only when the connection
# is established.
# Connection name for pro-active connection is made up of
# local end address + remote end address
local = self.get_localname(sock)[0]
remote = self.get_remotename(sock)[0]
conn_name = ('L: ' + local + ', R: ' + remote)
self._asso_socket_map[conn_name] = sock
# If connection is established, we call connection handler
# in a new thread.
self._spawn(conn_name, conn_handler, sock)
return sock
#
# Sink
#
def update_global_table(self, prefix, next_hop=None, is_withdraw=False):
"""Update a BGP route in the Global table for the given `prefix`
with the given `next_hop`.
If `is_withdraw` is False, which is the default, add a BGP route
to the Global table.
If `is_withdraw` is True, remove a BGP route from the Global table.
"""
src_ver_num = 1
peer = None
# set mandatory path attributes
origin = BGPPathAttributeOrigin(BGP_ATTR_ORIGIN_IGP)
aspath = BGPPathAttributeAsPath([[]])
pathattrs = OrderedDict()
pathattrs[BGP_ATTR_TYPE_ORIGIN] = origin
pathattrs[BGP_ATTR_TYPE_AS_PATH] = aspath
net = netaddr.IPNetwork(prefix)
ip = str(net.ip)
masklen = net.prefixlen
if netaddr.valid_ipv4(ip):
_nlri = IPAddrPrefix(masklen, ip)
if next_hop is None:
next_hop = '0.0.0.0'
p = Ipv4Path
else:
_nlri = IP6AddrPrefix(masklen, ip)
if next_hop is None:
next_hop = '::'
p = Ipv6Path
new_path = p(peer, _nlri, src_ver_num,
pattrs=pathattrs, nexthop=next_hop,
is_withdraw=is_withdraw)
# add to global table and propagates to neighbors
self.learn_path(new_path)
def _set_password(self, address, password):
if netaddr.valid_ipv4(address):
family = socket.AF_INET
else:
family = socket.AF_INET6
for sock in self.listen_sockets.values():
if sock.family == family:
sockopt.set_tcp_md5sig(sock, address, password)
def create_connection(address):
"""
Wrapper for socket.create_connection() function.
If *address* (a 2-tuple ``(host, port)``) contains a valid IPv4/v6
address, passes *address* to socket.create_connection().
If *host* is valid path to Unix Domain socket, tries to connect to
the server listening on the given socket.
:param address: IP address or path to Unix Domain socket.
:return: Socket instance.
"""
host, _port = address
if (netaddr.valid_ipv4(host)
or netaddr.valid_ipv6(host)):
return socket.create_connection(address)
elif os.path.exists(host):
sock = None
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(host)
except socket.error as e:
if sock is not None:
sock.close()
raise e
return sock
else:
raise ValueError('Invalid IP address or Unix Socket: %s' % host)
def general_info(self):
"""
Return IP in bits, ip_type (ie: private, multicast, loopback,etc..), time updated/returned and version for an IP Address
>>> from ipinformation import IPInformation
>>> from pprint import pprint
>>> pprint( IPInformation(ip_address='8.8.8.8').general_info() )
{'general': {'bits': '00001000000010000000100000001000',
'type': 'public',
'updated': datetime.datetime(2016, 1, 16, 18, 7, 4, 288512),
'version': '4'}}
>>> pprint( IPInformation(ip_address='127.0.0.1').general_info() )
{'general': {'bits': '01111111000000000000000000000001',
'type': 'loopback',
'updated': datetime.datetime(2016, 1, 16, 18, 10, 6, 729149),
'version': '4'}}
"""
data = { 'general': { 'bits': None, 'type': None, 'updated': None, 'version': None} }
if not self.ISIP:
# print '"%s" is not a valid IP Address.' %self.ip_address
# logging_file.error( '"{0}" is not a valid IP Address.'.format(self.ip_address) )
return data
if netaddr.valid_ipv4( self.ip_address ): #IPv4 Address
ip_version = '4'
data['general'].update({'version':ip_version})
ip_bits = netaddr.IPAddress( self.ip_address ).bits().replace( '.', '' ) #Set the IP bits for searching by subnet
data['general'].update({'bits':ip_bits})
ip_addr = netaddr.IPAddress(self.ip_address)
if ip_addr.is_private():
ip_type = 'private'
elif ip_addr.is_multicast():
ip_type = 'multicast'
elif ip_addr.is_loopback():
ip_type = 'loopback'
elif ip_addr.is_netmask():
ip_type = 'netmask'
elif ip_addr.is_reserved():
ip_type = 'reserved'
elif ip_addr.is_link_local():
ip_type = 'link_local'
elif ip_addr.is_unicast():
ip_type = 'public'
else: #Unknown Type
ip_type = 'unknown'
logging_file.error( '"{0}" is an unknown IP Address.'.format(self.ip_address) )
elif netaddr.valid_ipv6( self.ip_address ): #IPv6 Address#TODO:Finish IPv6
ip_version = '6'
print 'Is IPv6'
return False
data['general'].update( { 'type': ip_type } )
data['general'].update( { 'updated': datetime.utcnow() } )
return data
def main():
module = AnsibleModule(
argument_spec=dict(
neighbor=dict(required=False, default=None),
direction=dict(required=False, choices=['adv', 'rec']),
prefix=dict(required=False, default=None)
),
supports_check_mode=False
)
m_args = module.params
neighbor = m_args['neighbor']
direction = m_args['direction']
prefix = m_args['prefix']
regex_ip = re.compile('[0-9a-fA-F.:]+')
regex_iprange = re.compile('[0-9a-fA-F.:]+\/\d+')
regex_ipv4 = re.compile('[12][0-9]{0,2}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\/?\d+?')
if neighbor == None and direction == None and prefix == None:
module.fail_json(msg="No support of parsing 'show ip bgp' full prefix table yet")
return
if neighbor and ((not netaddr.valid_ipv4(neighbor)) and (not netaddr.valid_ipv6(neighbor))):
err_message = "Invalid neighbor address %s ??" % neighbor
module.fail_json(msg=err_message)
return
if (neighbor and not direction) or (neighbor and 'adv' not in direction.lower()):
err_message = 'No support of parsing this command " show ip(v6) bgp neighbor %s %s" yet' % (neighbor, direction)
module.fail_json(msg=err_message)
return
try:
bgproute = BgpRoutes(neighbor, direction, prefix)
if prefix:
if regex_ipv4.match(prefix):
command = "docker exec -i bgp vtysh -c 'show ip bgp " + str(prefix) + "'"
else:
command = "docker exec -i bgp vtysh -c 'show ipv6 bgp " + str(prefix) + "'"
rc, out, err = module.run_command(command)
if rc != 0:
err_message = "command %s failed rc=%d, out=%s, err=%s" %(command, rt, out, err)
module.fail_json(msg=err_message)
return
bgproute.parse_bgp_route_prefix(out)
elif neighbor:
if netaddr.valid_ipv4(neighbor):
command = "docker exec -i bgp vtysh -c 'show ip bgp neighbor " + str(neighbor) + " " + str(direction) + "'"
else:
command = "docker exec -i bgp vtysh -c 'show ipv6 bgp neighbor " + str(neighbor) + " " + str(direction) + "'"
rc, out, err = module.run_command(command)
if rc != 0:
err_message = "command %s failed rc=%d, out=%s, err=%s" %(command, rt, out, err)
module.fail_json(msg=err_message)
return
bgproute.parse_bgp_route_adv(out)
results = bgproute.get_facts()
module.exit_json(ansible_facts=results)
except Exception as e:
fail_msg = "cannot correctly parse BGP Routing facts!\n"
fail_msg += str(e)
module.fail_json(msg=fail_msg)
return
def check_host(host, allow_localhost=config.ALLOW_CONNECT_LOCALHOST):
"""Check if a given host is a valid DNS name or IPv4 address"""
try:
ipaddr = socket.gethostbyname(host)
except UnicodeEncodeError:
raise MistError('Please provide a valid DNS name')
except socket.gaierror:
raise MistError("Not a valid IP address or resolvable DNS name: '%s'."
% host)
if host != ipaddr:
msg = "Host '%s' resolves to '%s' which" % (host, ipaddr)
else:
msg = "Host '%s'" % host
if not netaddr.valid_ipv4(ipaddr):
raise MistError(msg + " is not a valid IPv4 address.")
forbidden_subnets = {
'0.0.0.0/8': "used for broadcast messages to the current network",
'100.64.0.0/10': ("used for communications between a service provider "
"and its subscribers when using a "
"Carrier-grade NAT"),
'169.254.0.0/16': ("used for link-local addresses between two hosts "
"on a single link when no IP address is otherwise "
"specified"),
'192.0.0.0/24': ("used for the IANA IPv4 Special Purpose Address "
"Registry"),
'192.0.2.0/24': ("assigned as 'TEST-NET' for use solely in "
"documentation and example source code"),
'192.88.99.0/24': "used by 6to4 anycast relays",
'198.18.0.0/15': ("used for testing of inter-network communications "
"between two separate subnets"),
'198.51.100.0/24': ("assigned as 'TEST-NET-2' for use solely in "
"documentation and example source code"),
'203.0.113.0/24': ("assigned as 'TEST-NET-3' for use solely in "
"documentation and example source code"),
'224.0.0.0/4': "reserved for multicast assignments",
'240.0.0.0/4': "reserved for future use",
'255.255.255.255/32': ("reserved for the 'limited broadcast' "
"destination address"),
}
if not allow_localhost:
forbidden_subnets['127.0.0.0/8'] = ("used for loopback addresses "
"to the local host")
cidr = netaddr.smallest_matching_cidr(ipaddr, forbidden_subnets.keys())
if cidr:
raise MistError("%s is not allowed. It belongs to '%s' "
"which is %s." % (msg, cidr,
forbidden_subnets[str(cidr)]))
def __init__(self, route_type, flags, message, safi=None, prefix=None,
nexthops=None, ifindexes=None,
distance=None, metric=None, mtu=None, tag=None,
from_zebra=False):
super(_ZebraIPRoute, self).__init__()
self.route_type = route_type
self.flags = flags
self.message = message
# SAFI should be included if this message sent to Zebra.
if from_zebra:
self.safi = None
else:
self.safi = safi or packet_safi.UNICAST
assert prefix is not None
if isinstance(prefix, (IPv4Prefix, IPv6Prefix)):
prefix = prefix.prefix
self.prefix = prefix
# Nexthops should be a list of str representations of IP address
# if this message sent from Zebra, otherwise a list of _Nexthop
# subclasses.
nexthops = nexthops or []
if from_zebra:
for nexthop in nexthops:
assert (netaddr.valid_ipv4(nexthop)
or netaddr.valid_ipv6(nexthop))
else:
for nexthop in nexthops:
assert isinstance(nexthop, _NextHop)
self.nexthops = nexthops
# Interface indexes should be included if this message sent from
# Zebra.
if from_zebra:
ifindexes = ifindexes or []
for ifindex in ifindexes:
assert isinstance(ifindex, six.integer_types)
self.ifindexes = ifindexes
else:
self.ifindexes = None
self.distance = distance
self.metric = metric
self.mtu = mtu
self.tag = tag
# is this message sent from Zebra message or not.
self.from_zebra = from_zebra
def _send_ip_route_impl(
self, prefix, nexthops=None,
safi=packet_safi.UNICAST, flags=zebra.ZEBRA_FLAG_INTERNAL,
distance=None, metric=None, mtu=None, tag=None,
is_withdraw=False):
if ip.valid_ipv4(prefix):
if is_withdraw:
msg_cls = zebra.ZebraIPv4RouteDelete
else:
msg_cls = zebra.ZebraIPv4RouteAdd
elif ip.valid_ipv6(prefix):
if is_withdraw:
msg_cls = zebra.ZebraIPv6RouteDelete
else:
msg_cls = zebra.ZebraIPv6RouteAdd
else:
raise ValueError('Invalid prefix: %s' % prefix)
nexthop_list = []
for nexthop in nexthops:
if netaddr.valid_ipv4(nexthop):
nexthop_list.append(zebra.NextHopIPv4(addr=nexthop))
elif netaddr.valid_ipv6(nexthop):
nexthop_list.append(zebra.NextHopIPv6(addr=nexthop))
else:
raise ValueError('Invalid nexthop: %s' % nexthop)
msg = zebra.ZebraMessage(
version=self.zserv_ver,
body=msg_cls(
route_type=self.route_type,
flags=flags,
message=0,
safi=safi,
prefix=prefix,
nexthops=nexthop_list,
distance=distance,
metric=metric,
mtu=mtu,
tag=tag))
self.send_msg(msg)
return msg