def create_connection(address):
"""
Wrapper for socket.create_connection() function.
If *address* (a 2-tuple ``(host, port)``) contains a valid IPv4/v6
address, passes *address* to socket.create_connection().
If *host* is valid path to Unix Domain socket, tries to connect to
the server listening on the given socket.
:param address: IP address or path to Unix Domain socket.
:return: Socket instance.
"""
host, _port = address
if (netaddr.valid_ipv4(host)
or netaddr.valid_ipv6(host)):
return socket.create_connection(address)
elif os.path.exists(host):
sock = None
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(host)
except socket.error as e:
if sock is not None:
sock.close()
raise e
return sock
else:
raise ValueError('Invalid IP address or Unix Socket: %s' % host)
python类valid_ipv6()的实例源码
def parse_server_string(server_str):
"""Parses the given server_string and returns a tuple of host and port.
If it's not a combination of host part and port, the port element
is an empty string. If the input is invalid expression, return a tuple of
two empty strings.
"""
try:
# First of all, exclude pure IPv6 address (w/o port).
if netaddr.valid_ipv6(server_str):
return (server_str, '')
# Next, check if this is IPv6 address with a port number combination.
if server_str.find("]:") != -1:
(address, port) = server_str.replace('[', '', 1).split(']:')
return (address, port)
# Third, check if this is a combination of an address and a port
if server_str.find(':') == -1:
return (server_str, '')
# This must be a combination of an address and a port
(address, port) = server_str.split(':')
return (address, port)
except (ValueError, netaddr.AddrFormatError):
LOG.error(_LE('Invalid server_string: %s'), server_str)
return ('', '')
def general_info(self):
"""
Return IP in bits, ip_type (ie: private, multicast, loopback,etc..), time updated/returned and version for an IP Address
>>> from ipinformation import IPInformation
>>> from pprint import pprint
>>> pprint( IPInformation(ip_address='8.8.8.8').general_info() )
{'general': {'bits': '00001000000010000000100000001000',
'type': 'public',
'updated': datetime.datetime(2016, 1, 16, 18, 7, 4, 288512),
'version': '4'}}
>>> pprint( IPInformation(ip_address='127.0.0.1').general_info() )
{'general': {'bits': '01111111000000000000000000000001',
'type': 'loopback',
'updated': datetime.datetime(2016, 1, 16, 18, 10, 6, 729149),
'version': '4'}}
"""
data = { 'general': { 'bits': None, 'type': None, 'updated': None, 'version': None} }
if not self.ISIP:
# print '"%s" is not a valid IP Address.' %self.ip_address
# logging_file.error( '"{0}" is not a valid IP Address.'.format(self.ip_address) )
return data
if netaddr.valid_ipv4( self.ip_address ): #IPv4 Address
ip_version = '4'
data['general'].update({'version':ip_version})
ip_bits = netaddr.IPAddress( self.ip_address ).bits().replace( '.', '' ) #Set the IP bits for searching by subnet
data['general'].update({'bits':ip_bits})
ip_addr = netaddr.IPAddress(self.ip_address)
if ip_addr.is_private():
ip_type = 'private'
elif ip_addr.is_multicast():
ip_type = 'multicast'
elif ip_addr.is_loopback():
ip_type = 'loopback'
elif ip_addr.is_netmask():
ip_type = 'netmask'
elif ip_addr.is_reserved():
ip_type = 'reserved'
elif ip_addr.is_link_local():
ip_type = 'link_local'
elif ip_addr.is_unicast():
ip_type = 'public'
else: #Unknown Type
ip_type = 'unknown'
logging_file.error( '"{0}" is an unknown IP Address.'.format(self.ip_address) )
elif netaddr.valid_ipv6( self.ip_address ): #IPv6 Address#TODO:Finish IPv6
ip_version = '6'
print 'Is IPv6'
return False
data['general'].update( { 'type': ip_type } )
data['general'].update( { 'updated': datetime.utcnow() } )
return data
def main():
module = AnsibleModule(
argument_spec=dict(
neighbor=dict(required=False, default=None),
direction=dict(required=False, choices=['adv', 'rec']),
prefix=dict(required=False, default=None)
),
supports_check_mode=False
)
m_args = module.params
neighbor = m_args['neighbor']
direction = m_args['direction']
prefix = m_args['prefix']
regex_ip = re.compile('[0-9a-fA-F.:]+')
regex_iprange = re.compile('[0-9a-fA-F.:]+\/\d+')
regex_ipv4 = re.compile('[12][0-9]{0,2}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\/?\d+?')
if neighbor == None and direction == None and prefix == None:
module.fail_json(msg="No support of parsing 'show ip bgp' full prefix table yet")
return
if neighbor and ((not netaddr.valid_ipv4(neighbor)) and (not netaddr.valid_ipv6(neighbor))):
err_message = "Invalid neighbor address %s ??" % neighbor
module.fail_json(msg=err_message)
return
if (neighbor and not direction) or (neighbor and 'adv' not in direction.lower()):
err_message = 'No support of parsing this command " show ip(v6) bgp neighbor %s %s" yet' % (neighbor, direction)
module.fail_json(msg=err_message)
return
try:
bgproute = BgpRoutes(neighbor, direction, prefix)
if prefix:
if regex_ipv4.match(prefix):
command = "docker exec -i bgp vtysh -c 'show ip bgp " + str(prefix) + "'"
else:
command = "docker exec -i bgp vtysh -c 'show ipv6 bgp " + str(prefix) + "'"
rc, out, err = module.run_command(command)
if rc != 0:
err_message = "command %s failed rc=%d, out=%s, err=%s" %(command, rt, out, err)
module.fail_json(msg=err_message)
return
bgproute.parse_bgp_route_prefix(out)
elif neighbor:
if netaddr.valid_ipv4(neighbor):
command = "docker exec -i bgp vtysh -c 'show ip bgp neighbor " + str(neighbor) + " " + str(direction) + "'"
else:
command = "docker exec -i bgp vtysh -c 'show ipv6 bgp neighbor " + str(neighbor) + " " + str(direction) + "'"
rc, out, err = module.run_command(command)
if rc != 0:
err_message = "command %s failed rc=%d, out=%s, err=%s" %(command, rt, out, err)
module.fail_json(msg=err_message)
return
bgproute.parse_bgp_route_adv(out)
results = bgproute.get_facts()
module.exit_json(ansible_facts=results)
except Exception as e:
fail_msg = "cannot correctly parse BGP Routing facts!\n"
fail_msg += str(e)
module.fail_json(msg=fail_msg)
return
def _send_ip_route_impl(
self, prefix, nexthops=None,
safi=packet_safi.UNICAST, flags=zebra.ZEBRA_FLAG_INTERNAL,
distance=None, metric=None, mtu=None, tag=None,
is_withdraw=False):
if ip.valid_ipv4(prefix):
if is_withdraw:
msg_cls = zebra.ZebraIPv4RouteDelete
else:
msg_cls = zebra.ZebraIPv4RouteAdd
elif ip.valid_ipv6(prefix):
if is_withdraw:
msg_cls = zebra.ZebraIPv6RouteDelete
else:
msg_cls = zebra.ZebraIPv6RouteAdd
else:
raise ValueError('Invalid prefix: %s' % prefix)
nexthop_list = []
for nexthop in nexthops:
if netaddr.valid_ipv4(nexthop):
nexthop_list.append(zebra.NextHopIPv4(addr=nexthop))
elif netaddr.valid_ipv6(nexthop):
nexthop_list.append(zebra.NextHopIPv6(addr=nexthop))
else:
raise ValueError('Invalid nexthop: %s' % nexthop)
msg = zebra.ZebraMessage(
version=self.zserv_ver,
body=msg_cls(
route_type=self.route_type,
flags=flags,
message=0,
safi=safi,
prefix=prefix,
nexthops=nexthop_list,
distance=distance,
metric=metric,
mtu=mtu,
tag=tag))
self.send_msg(msg)
return msg