def inet_pton(address_family, ip_string):
if address_family == socket.AF_INET:
return socket.inet_aton(ip_string)
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError())
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16)
raise socket.error('unknown address family')
python类inet_pton()的实例源码
def ipv4(address):
address = address.replace("http://", "").replace("https://", "")
try:
socket.inet_pton(socket.AF_INET, address)
except AttributeError:
try:
socket.inet_aton(address)
except socket.error:
raise OptionValidationError("Option have to be valid IP address.")
if address.count('.') == 3:
return address
else:
raise OptionValidationError("Option have to be valid IP address.")
except socket.error:
raise OptionValidationError("Option have to be valid IP address.")
return address
def is_ipv4(value):
"""Utility function to detect if a value is a valid IPv4
:param value: The value to match against.
:return: True if the value is a valid IPv4.
"""
try:
socket.inet_pton(socket.AF_INET, value)
except AttributeError: # no inet_pton here, sorry
try:
socket.inet_aton(value)
except socket.error:
return False
return value.count('.') == 3
except socket.error: # not a valid address
return False
return True
def is_valid_ipv4_address(address):
try:
socket.inet_pton(socket.AF_INET, address)
except AttributeError: # no inet_pton here, sorry
try:
socket.inet_aton(address)
except socket.error:
return False
return address.count('.') == 3
except socket.error: # not a valid address
return False
return True
# ??????? ??? ??????????? ?????????? ???????????? ? OID. ??????????????? ???????? '%s' ?? ????????? ? ??????? ????? '0' (?????? ????????)
# ????? ??????? ???????? ??? {?} ?? ??????? ? ?? ??????? ? ???????
# ??????, ??? ?????????? ?????????????? ??????? ? ??? ??? ??????? ??? ????, ?? ???? ??????? ????? ?????????? :)
def ipv4(address):
address = address.replace("http://", "").replace("https://", "")
try:
socket.inet_pton(socket.AF_INET, address)
except AttributeError:
try:
socket.inet_aton(address)
except socket.error:
raise OptionValidationError("Option have to be valid IP address.")
if address.count('.') == 3:
return address
else:
raise OptionValidationError("Option have to be valid IP address.")
except socket.error:
raise OptionValidationError("Option have to be valid IP address.")
return address
def _pack_addr_bytes_from(self, addr_type, addr):
addr_bytes = None
if addr_type in [constants.SOCKS5_ADDRTYPE_IPV4, constants.SOCKS5_ADDRTYPE_IPV6]:
addr_bytes = socket.inet_pton({constants.SOCKS5_ADDRTYPE_IPV4: socket.AF_INET,
constants.SOCKS5_ADDRTYPE_IPV6: socket.AF_INET6}[addr_type],
addr)
elif addr_type == constants.SOCKS5_ADDRTYPE_HOST:
if len(addr) > 255:
addr = addr[:255]
addr_bytes = chr(len(addr)) + addr
addr_bytes = addr_bytes.encode('utf-8')
addr_bytes = chr(addr_type).encode('utf-8') + addr_bytes
return addr_bytes
def is_fdqn(self):
# I will assume the following
# If I can't socket.inet_aton() then it's not an IPv4 address
# Same for ipv6, but since socket.inet_pton is not available in Windows, I'll look for ':'. There can't be
# an FQDN with ':'
# Is it isn't both, then it is a FDQN
try:
socket.inet_aton(self.__target)
except:
# Not an IPv4
try:
self.__target.index(':')
except:
# Not an IPv6, it's a FDQN
return True
return False
def _convert_host_to_hex(host):
"""
Convert the provided host to the format in /proc/net/tcp*
/proc/net/tcp uses little-endian four byte hex for ipv4
/proc/net/tcp6 uses little-endian per 4B word for ipv6
Args:
host: String with either hostname, IPv4, or IPv6 address
Returns:
List of tuples containing address family and the
little-endian converted host
"""
ips = []
if host is not None:
for family, ip in _convert_host_to_ip(host):
hexip_nf = binascii.b2a_hex(socket.inet_pton(family, ip))
hexip_hf = ""
for i in range(0, len(hexip_nf), 8):
ipgroup_nf = hexip_nf[i:i+8]
ipgroup_hf = socket.ntohl(int(ipgroup_nf, base=16))
hexip_hf = "%s%08X" % (hexip_hf, ipgroup_hf)
ips.append((family, hexip_hf))
return ips
def str_to_int(addr, flags=0):
"""
:param addr: An IPv4 dotted decimal address in string form.
:param flags: decides which rules are applied to the interpretation of the
addr value. Supported constants are INET_PTON and ZEROFILL. See the
netaddr.core docs for details.
:return: The equivalent unsigned integer for a given IPv4 address.
"""
if flags & ZEROFILL:
addr = '.'.join(['%d' % int(i) for i in addr.split('.')])
try:
if flags & INET_PTON:
return _struct.unpack('>I', _inet_pton(AF_INET, addr))[0]
else:
return _struct.unpack('>I', _inet_aton(addr))[0]
except Exception:
raise AddrFormatError('%r is not a valid IPv4 address string!' % addr)
def checkip(ip):
"""Check IPv4/IPv6 address for validity"""
try:
if ':' in ip:
socket.inet_pton(socket.AF_INET6, ip)
else:
socket.inet_pton(socket.AF_INET, ip)
except socket.error:
return False
return True
#
# Regexp for a valid FQDN:
# - only letters, digits, '-'
# - no '-' at the beginning or end of a label
# - at least one '.'
# - no '.' at the beginning or end
#
def __init__(self, subnet, prefix, is_ipv6=False):
self.__no_use_iplist_num = []
self.__subnet = subnet
self.__prefix = prefix
self.__is_ipv6 = is_ipv6
if not is_ipv6:
self.__fa = socket.AF_INET
self.__cur_max_ipaddr_num = utils.bytes2number(socket.inet_pton(socket.AF_INET, subnet))
self.__prefix_num = utils.calc_net_prefix_num(prefix)
else:
self.__fa = socket.AF_INET6
self.__cur_max_ipaddr_num = utils.bytes2number(socket.inet_pton(socket.AF_INET6, subnet))
self.__prefix_num = utils.calc_net_prefix_num(prefix, is_ipv6=True)
self.__subnet_num = self.__cur_max_ipaddr_num
return
def calc_subnet(ipaddr, prefix, is_ipv6=False):
if is_ipv6 and prefix == 128: return ipaddr
if not is_ipv6 and prefix == 32: return ipaddr
q = int(prefix / 8)
r = prefix % 8
if is_ipv6:
byte_ipaddr = socket.inet_pton(socket.AF_INET6, ipaddr)
results = list(bytes(16))
else:
byte_ipaddr = socket.inet_pton(socket.AF_INET, ipaddr)
results = list(bytes(4))
results[0:q] = byte_ipaddr[0:q]
v = 0
for n in range(r + 1):
if n == 0: continue
v += 2 ** (8 - n)
results[q] = byte_ipaddr[q] & v
if is_ipv6:
return socket.inet_ntop(socket.AF_INET6, bytes(results))
else:
return socket.inet_ntop(socket.AF_INET, bytes(results))
def _convert_host_to_hex(host):
"""
Convert the provided host to the format in /proc/net/tcp*
/proc/net/tcp uses little-endian four byte hex for ipv4
/proc/net/tcp6 uses little-endian per 4B word for ipv6
Args:
host: String with either hostname, IPv4, or IPv6 address
Returns:
List of tuples containing address family and the
little-endian converted host
"""
ips = []
if host is not None:
for family, ip in _convert_host_to_ip(host):
hexip_nf = binascii.b2a_hex(socket.inet_pton(family, ip))
hexip_hf = ""
for i in range(0, len(hexip_nf), 8):
ipgroup_nf = hexip_nf[i:i+8]
ipgroup_hf = socket.ntohl(int(ipgroup_nf, base=16))
hexip_hf = "%s%08X" % (hexip_hf, ipgroup_hf)
ips.append((family, hexip_hf))
return ips
def is_valid_address(cls, ip):
if ip.count('/') == 1:
ip_address, netmask = ip.split('/')
else:
ip_address = ip
if len(ip_address.split('.')) == 4:
try:
socket.inet_pton(socket.AF_INET, ip_address)
except socket.error:
return False
if not 0 <= netmask <= 32:
return False
else:
try:
socket.inet_pton(socket.AF_INET6, ip_address)
except socket.error:
return False
if not 0 <= netmask <= 128:
return False
return True
def cidr_range(cidr_address):
family = AF_INET6 if ':' in cidr_address else AF_INET
address, maskstr = cidr_address.split('/')
maskbits = int(maskstr)
# Parse the supplied address into bytes
addr_bytes = inet_pton(family, address)
# Calculate number of address bytes and mask bits
addr_len = len(addr_bytes)
numaddrs = 2**(addr_len*8 - maskbits)
mask = -numaddrs
# Generate addresses
addr = int.from_bytes(addr_bytes, 'big') & mask
for n in range(numaddrs):
yield inet_ntop(family, (addr+n).to_bytes(addr_len, 'big'))
def IPv6(ip_addr):
"""
Validate a string as an IPv6 address. See `man 3 inet_pton` for details.
Availability is platform-dependent.
"""
ip_addr = str(ip_addr)
try:
socket.inet_pton(socket.AF_INET6, ip_addr)
except socket.error:
raise ValueError('Invalid IPv6 address: %s' % ip_addr)
except AttributeError:
raise ValueError('IPv6 validation unavailable on this platform.')
return ip_addr
def addrparse(addr_str):
if addr_str is None:
domain=socket.AF_INET
addr,port = "0.0.0.0",0
elif addr_str.count(':') == 0:
# Port only given
domain=socket.AF_INET
addr,port="0.0.0.0",addr_str
elif addr_str.count(':') == 1:
# IPv4 address and port
domain=socket.AF_INET
addr,port=addr_str.rsplit(':')
else:
domain=socket.AF_INET6
addr,port=addr_str.rsplit(':',1)
try:
socket.inet_pton(domain,addr)
except:
sys.exit("Invalid address %s" % addr)
try:
port = int(port)
except:
sys.exit("Invalid port '%s'" % port)
return domain, addr, port
def __init__(self, destination, routing_specifications, options):
LoadbalancerControl.__init__(self, options)
# Use default port, if none specified
if not ":" in destination:
self.destination = (destination, self.port)
else:
self.destination = destination.rsplit(':',1)
# Is destination a v6-address?
try:
socket.inet_pton(socket.AF_INET6, self.destination[0])
self.set_socket(socket.socket(socket.AF_INET6, socket.SOCK_DGRAM))
except:
self.set_socket(socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
for routing_specs in routing_specifications:
self.assign_loadbalancer_to_specs(routing_specs)
def encode_attr_mp_nexthop(nexthop):
""" Encode string nexthop into a BGP MP_REACH attribute TLV
A modified MP_REACH attribute that includes only the next hop info (per MRT RFC6396)
This includes only the next hop address lenght (1 octet) and the next-hop
:param nexthop: String value for the next-hop
:return: BGP attribute TLV ready to be written
"""
# Set afi and length based on nexthop address
if ':' in nexthop:
nh_len = 16
afi = socket.AF_INET6
else:
nh_len = 4
afi = socket.AF_INET
return ATTR_TYPE_MP_REACH + pack('!BB', nh_len + 1, nh_len) + socket.inet_pton(afi, nexthop)
def encode_attr_cluster_list(cluster_list):
""" Encode string cluster_list into a BGP CLUSTER_LIST attribute TLV
:param cluster_list: String space delimited list of cluster id's
:return: BGP attribute TLV ready to be written
"""
encode_cluster_list = ""
try:
for cluster_id in cluster_list.split(' '):
encode_cluster_list += socket.inet_pton(socket.AF_INET, cluster_id)
return ATTR_TYPE_CLUSTER_LIST + pack('!B', len(encode_cluster_list)) + encode_cluster_list
except:
return ''
def is_valid_ipv4(ip_addr):
"""
Return buffer in network order
"""
if type(ip_addr) == bytes and len(ip_addr) == 4:
return ip_addr
if type(ip_addr)== int:
ip_addr = socket.inet_ntoa(struct.pack("!I", ip_addr))
try:
return socket.inet_pton(socket.AF_INET, ip_addr)
except AttributeError: # no inet_pton here, sorry
return socket.inet_aton(ip_addr)
except socket.error: # not a valid address
raise CTRexPacketBuildException(-10,"Not valid ipv4 format");
def validate_ipv4(ip):
"""
Validate an ipv4 address
:param ip: The string with the ip
:return: True ip if it's valid for ipv4, False otherwise
"""
ip = str(ip)
try:
socket.inet_pton(socket.AF_INET, ip)
except AttributeError:
try:
socket.inet_aton(ip)
except socket.error:
return False
except socket.error:
return False
return True
def str_to_int(addr, flags=0):
"""
:param addr: An IPv4 dotted decimal address in string form.
:param flags: decides which rules are applied to the interpretation of the
addr value. Supported constants are INET_PTON and ZEROFILL. See the
netaddr.core docs for details.
:return: The equivalent unsigned integer for a given IPv4 address.
"""
if flags & ZEROFILL:
addr = '.'.join(['%d' % int(i) for i in addr.split('.')])
try:
if flags & INET_PTON:
return _struct.unpack('>I', _inet_pton(AF_INET, addr))[0]
else:
return _struct.unpack('>I', _inet_aton(addr))[0]
except Exception:
raise AddrFormatError('%r is not a valid IPv4 address string!' % addr)
def __init__(self, *args, **kwargs):
# enable to pass IPv4 addr in human-readable format
if 'val' in kwargs:
if 'src' in kwargs['val'] and len(kwargs['val']['src']) != 16:
try:
kwargs['val']['src'] = inet_pton(AF_INET6, kwargs['val']['src'])
except:
pass
if 'dst' in kwargs['val'] and len(kwargs['val']['dst']) != 16:
try:
kwargs['val']['dst'] = inet_pton(AF_INET6, kwargs['val']['dst'])
except:
pass
Envelope.__init__(self, *args, **kwargs)
if 'val' not in kwargs or 'plen' not in kwargs['val']:
self[3].set_valauto(self._set_plen_val)
if 'val' not in kwargs or 'next' not in kwargs['val']:
self[4].set_valauto(self._set_next_val)
def testIPv6toString(self):
try:
from socket import inet_pton, AF_INET6, has_ipv6
if not has_ipv6:
self.skipTest('IPv6 not available')
except ImportError:
self.skipTest('could not import needed symbols from socket')
f = lambda a: inet_pton(AF_INET6, a)
self.assertEqual('\x00' * 16, f('::'))
self.assertEqual('\x00' * 16, f('0::0'))
self.assertEqual('\x00\x01' + '\x00' * 14, f('1::'))
self.assertEqual(
'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
)
def testIPv6toString(self):
try:
from socket import inet_pton, AF_INET6, has_ipv6
if not has_ipv6:
self.skipTest('IPv6 not available')
except ImportError:
self.skipTest('could not import needed symbols from socket')
f = lambda a: inet_pton(AF_INET6, a)
self.assertEqual('\x00' * 16, f('::'))
self.assertEqual('\x00' * 16, f('0::0'))
self.assertEqual('\x00\x01' + '\x00' * 14, f('1::'))
self.assertEqual(
'\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
)
def get_if_raw_addr(ifname):
"""Returns the IPv4 address configured on 'ifname', packed with inet_pton."""
# Get ifconfig output
try:
fd = os.popen("%s %s" % (conf.prog.ifconfig, ifname))
except OSError, msg:
raise Scapy_Exception("Failed to execute ifconfig: (%s)" % msg)
# Get IPv4 addresses
addresses = [l for l in fd.readlines() if l.find("netmask") >= 0]
if not addresses:
raise Scapy_Exception("No IPv4 address found on %s !" % ifname)
# Pack the first address
address = addresses[0].split(' ')[1]
return socket.inet_pton(socket.AF_INET, address)
def neighsol(addr, src, iface, timeout=1, chainCC=0):
"""
Sends an ICMPv6 Neighbor Solicitation message to get the MAC address
of the neighbor with specified IPv6 address addr. 'src' address is
used as source of the message. Message is sent on iface. By default,
timeout waiting for an answer is 1 second.
If no answer is gathered, None is returned. Else, the answer is
returned (ethernet frame).
"""
nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
d = inet_ntop(socket.AF_INET6, nsma)
dm = in6_getnsmac(nsma)
p = Ether(dst=dm)/IPv6(dst=d, src=src, hlim=255)
p /= ICMPv6ND_NS(tgt=addr)
p /= ICMPv6NDOptSrcLLAddr(lladdr=get_if_hwaddr(iface))
res = srp1(p,type=ETH_P_IPV6, iface=iface, timeout=1, verbose=0,
chainCC=chainCC)
return res
def i2m(self, pkt, x):
l = pkt.len
if x is None:
x = "::"
if l is None:
l = 1
x = inet_pton(socket.AF_INET6, x)
if l is None:
return x
if l in [0, 1]:
return ""
if l in [2, 3]:
return x[:8*(l-1)]
return x + '\x00'*8*(l-3)
def _resolve_one(self, ip):
"""
overloaded version to provide a Whois resolution on the
embedded IPv4 address if the address is 6to4 or Teredo.
Otherwise, the native IPv6 address is passed.
"""
if in6_isaddr6to4(ip): # for 6to4, use embedded @
tmp = inet_pton(socket.AF_INET6, ip)
addr = inet_ntop(socket.AF_INET, tmp[2:6])
elif in6_isaddrTeredo(ip): # for Teredo, use mapped address
addr = teredoAddrExtractInfo(ip)[2]
else:
addr = ip
_, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)
return ip,asn,desc