def inet_pton(address_family, ip_string):
addr = sockaddr()
addr.sa_family = address_family
addr_size = ctypes.c_int(ctypes.sizeof(addr))
if WSAStringToAddressA(
ip_string,
address_family,
None,
ctypes.byref(addr),
ctypes.byref(addr_size)
) != 0:
raise socket.error(ctypes.FormatError())
if address_family == socket.AF_INET:
return ctypes.string_at(addr.ipv4_addr, 4)
if address_family == socket.AF_INET6:
return ctypes.string_at(addr.ipv6_addr, 16)
raise socket.error('unknown address family')
python类inet_pton()的实例源码
def ip_to_integer(ip_address):
"""
Converts an IP address expressed as a string to its
representation as an integer value and returns a tuple
(ip_integer, version), with version being the IP version
(either 4 or 6).
Both IPv4 addresses (e.g. "192.168.1.1") and IPv6 addresses
(e.g. "2a02:a448:ddb0::") are accepted.
"""
# try parsing the IP address first as IPv4, then as IPv6
for version in (socket.AF_INET, socket.AF_INET6):
try:
ip_hex = socket.inet_pton(version, ip_address)
ip_integer = int(binascii.hexlify(ip_hex), 16)
return (ip_integer, 4 if version == socket.AF_INET else 6)
except:
pass
raise ValueError("invalid IP address")
def is_fdqn(self):
# I will assume the following
# If I can't socket.inet_aton() then it's not an IPv4 address
# Same for ipv6, but since socket.inet_pton is not available in Windows, I'll look for ':'. There can't be
# an FQDN with ':'
# Is it isn't both, then it is a FDQN
try:
socket.inet_aton(self.__target)
except:
# Not an IPv4
try:
self.__target.index(':')
except:
# Not an IPv6, it's a FDQN
return True
return False
def compressSourceAddr(self, ipv6):
#print "compressSourceAddr"
tmp_ip = socket.inet_pton(socket.AF_INET6, ipv6.src)
if self.sac == 0:
if self.sam == 0x0:
tmp_ip = tmp_ip
elif self.sam == 0x1:
tmp_ip = tmp_ip[8:16]
elif self.sam == 0x2:
tmp_ip = tmp_ip[14:16]
else: #self.sam == 0x3:
pass
else: #self.sac == 1
if self.sam == 0x0:
tmp_ip = "\x00"*16
elif self.sam == 0x1:
tmp_ip = tmp_ip[8:16]
elif self.sam == 0x2:
tmp_ip = tmp_ip[14:16]
self.sourceAddr = socket.inet_ntop(socket.AF_INET6, "\x00"*(16-len(tmp_ip)) + tmp_ip)
return self.sourceAddr
def is_ipv4_address(address):
try:
socket.inet_pton(socket.AF_INET, address)
except AttributeError:
try:
socket.inet_aton(address)
except socket.error:
return False
return [x for x in address.split(".") if (len(x) > 0 and x.isdigit() and int(x) >= 0 and int(x) <= 255)] == 4
except socket.error:
return False
return True
# Class for printing and styling text to terminal
def _write_SOCKS5_address(self, addr, file):
"""
Return the host and port packed for the SOCKS5 protocol,
and the resolved address as a tuple object.
"""
host, port = addr
proxy_type, _, _, rdns, username, password = self.proxy
family_to_byte = {socket.AF_INET: b"\x01", socket.AF_INET6: b"\x04"}
# If the given destination address is an IP address, we'll
# use the IP address request even if remote resolving was specified.
# Detect whether the address is IPv4/6 directly.
for family in (socket.AF_INET, socket.AF_INET6):
try:
addr_bytes = socket.inet_pton(family, host)
file.write(family_to_byte[family] + addr_bytes)
host = socket.inet_ntop(family, addr_bytes)
file.write(struct.pack(">H", port))
return host, port
except socket.error:
continue
# Well it's not an IP number, so it's probably a DNS name.
if rdns:
# Resolve remotely
host_bytes = host.encode("idna")
file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes)
else:
# Resolve locally
addresses = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_ADDRCONFIG)
# We can't really work out what IP is reachable, so just pick the
# first.
target_addr = addresses[0]
family = target_addr[0]
host = target_addr[4][0]
addr_bytes = socket.inet_pton(family, host)
file.write(family_to_byte[family] + addr_bytes)
host = socket.inet_ntop(family, addr_bytes)
file.write(struct.pack(">H", port))
return host, port
def __host_per_rfc_2732(host):
"Format a host name or IP for a URL according to RFC 2732"
try:
socket.inet_pton(socket.AF_INET6, host)
return "[%s]" % (host)
except socket.error:
return host # Not an IPv6 address
def inet_pton(family, addr):
addr = to_str(addr)
if family == socket.AF_INET:
return socket.inet_aton(addr)
elif family == socket.AF_INET6:
if '.' in addr: # a v4 addr
v4addr = addr[addr.rindex(':') + 1:]
v4addr = socket.inet_aton(v4addr)
v4addr = ['%02X' % ord(x) for x in v4addr]
v4addr.insert(2, ':')
newaddr = addr[:addr.rindex(':') + 1] + ''.join(v4addr)
return inet_pton(family, newaddr)
dbyts = [0] * 8 # 8 groups
grps = addr.split(':')
for i, v in enumerate(grps):
if v:
dbyts[i] = int(v, 16)
else:
for j, w in enumerate(grps[::-1]):
if w:
dbyts[7 - j] = int(w, 16)
else:
break
break
return b''.join((chr(i // 256) + chr(i % 256)) for i in dbyts)
else:
raise RuntimeError("What family?")
def is_ip(address):
for family in (socket.AF_INET, socket.AF_INET6):
try:
if type(address) != str:
address = address.decode('utf8')
inet_pton(family, address)
return family
except (TypeError, ValueError, OSError, IOError):
pass
return False
def patch_socket():
if not hasattr(socket, 'inet_pton'):
socket.inet_pton = inet_pton
if not hasattr(socket, 'inet_ntop'):
socket.inet_ntop = inet_ntop
def pack_addr(address):
address_str = to_str(address)
for family in (socket.AF_INET, socket.AF_INET6):
try:
r = socket.inet_pton(family, address_str)
if family == socket.AF_INET6:
return b'\x04' + r
else:
return b'\x01' + r
except (TypeError, ValueError, OSError, IOError):
pass
if len(address) > 255:
address = address[:255] # TODO
return b'\x03' + chr(len(address)) + address
def add_network(self, addr):
if addr is "":
return
block = addr.split('/')
addr_family = is_ip(block[0])
addr_len = IPNetwork.ADDRLENGTH[addr_family]
if addr_family is socket.AF_INET:
ip, = struct.unpack("!I", socket.inet_aton(block[0]))
elif addr_family is socket.AF_INET6:
hi, lo = struct.unpack("!QQ", inet_pton(addr_family, block[0]))
ip = (hi << 64) | lo
else:
raise Exception("Not a valid CIDR notation: %s" % addr)
if len(block) is 1:
prefix_size = 0
while (ip & 1) == 0 and ip is not 0:
ip >>= 1
prefix_size += 1
logging.warn("You did't specify CIDR routing prefix size for %s, "
"implicit treated as %s/%d" % (addr, addr, addr_len))
elif block[1].isdigit() and int(block[1]) <= addr_len:
prefix_size = addr_len - int(block[1])
ip >>= prefix_size
else:
raise Exception("Not a valid CIDR notation: %s" % addr)
if addr_family is socket.AF_INET:
self._network_list_v4.append((ip, prefix_size))
else:
self._network_list_v6.append((ip, prefix_size))
def test_inet_conv():
ipv4 = b'8.8.4.4'
b = inet_pton(socket.AF_INET, ipv4)
assert inet_ntop(socket.AF_INET, b) == ipv4
ipv6 = b'2404:6800:4005:805::1011'
b = inet_pton(socket.AF_INET6, ipv6)
assert inet_ntop(socket.AF_INET6, b) == ipv6
def pack(data):
return socket.inet_pton(socket.AF_INET if len(data.split('.')) == 4 else socket.AF_INET6, data)
def addr_to_dbus(addr, family):
if (family == socket.AF_INET):
return dbus.UInt32(struct.unpack('I', socket.inet_pton(family, addr))[0])
else:
return dbus.ByteArray(socket.inet_pton(family, addr))
def reqSession(self, sender="", s_port=20001, receiver="", r_port=20002, startTime=0, timeOut=3, dscp=0, padding=0):
typeP = dscp << 24
if startTime != 0:
startTime += now() + TIMEOFFSET
if sender == "":
request = struct.pack('!4B L L H H 13L 4ILQ4L', 5, 4, 0, 0, 0, 0, s_port, r_port, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, padding, startTime, 0, timeOut, 0, typeP, 0, 0, 0, 0, 0)
elif sender == "::":
request = struct.pack('!4B L L H H 13L 4ILQ4L', 5, 6, 0, 0, 0, 0, s_port, r_port, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, padding, startTime, 0, timeOut, 0, typeP, 0, 0, 0, 0, 0)
elif ':' in sender:
s = socket.inet_pton(socket.AF_INET6, sender)
r = socket.inet_pton(socket.AF_INET6, receiver)
request = struct.pack('!4B L L H H 16s 16s 4L L 4ILQ4L', 5, 6, 0, 0, 0, 0, s_port, r_port, s, r, 0, 0, 0, 0, padding, startTime, 0, timeOut, 0, typeP, 0, 0, 0, 0, 0)
else:
s = socket.inet_pton(socket.AF_INET, sender)
r = socket.inet_pton(socket.AF_INET, receiver)
request = struct.pack('!4B L L H H 16s 16s 4L L 4ILQ4L', 5, 4, 0, 0, 0, 0, s_port, r_port, s, r, 0, 0, 0, 0, padding, startTime, 0, timeOut, 0, typeP, 0, 0, 0, 0, 0)
log.info("CTRL.TX <<Request Session>>")
self.send(request)
log.info("CTRL.RX <<Session Accept>>")
data = self.receive()
rval = ord(data[0])
if rval != 0:
log.critical("ERROR CODE %d in <<Session Accept>>", rval)
return False
return True
def _parse_ip(string, families=(socket.AF_INET, socket.AF_INET6)):
for family in families:
try:
return socket.inet_ntop(family, socket.inet_pton(family, string))
except (ValueError, socket.error):
pass
return None
def _nibbles(ipv6, _hex="0123456789abcdef"):
result = []
for ch in socket.inet_pton(socket.AF_INET6, ipv6):
num = ord(ch)
result.append(_hex[num >> 4])
result.append(_hex[num & 0xf])
return result
def parse_ipv4(ip):
try:
packed = inet_pton(AF_INET, ip.strip())
except (error, UnicodeEncodeError):
return None
ip_num, = struct.unpack("!I", packed)
return ip_num
def parse_ipv6(ip):
try:
packed = inet_pton(AF_INET6, ip.strip())
except (error, UnicodeEncodeError):
return None
hi, lo = struct.unpack("!QQ", packed)
return (hi << 64) | lo