def m2i(self, pkt, s):
family = None
if pkt.type == 1: # A
family = socket.AF_INET
elif pkt.type == 12: # PTR
s = DNSgetstr(s, 0)[0]
elif pkt.type == 16: # TXT
ret_s = ""
tmp_s = s
# RDATA contains a list of strings, each are prepended with
# a byte containing the size of the following string.
while tmp_s:
tmp_len = struct.unpack("!B", tmp_s[0])[0] + 1
if tmp_len > len(tmp_s):
warning("DNS RR TXT prematured end of character-string (size=%i, remaining bytes=%i)" % (tmp_len, len(tmp_s)))
ret_s += tmp_s[1:tmp_len]
tmp_s = tmp_s[tmp_len:]
s = ret_s
elif pkt.type == 28: # AAAA
family = socket.AF_INET6
if family is not None:
s = inet_ntop(family, s)
return s
python类AF_INET6的实例源码
def i2m(self, pkt, s):
if pkt.type == 1: # A
if s:
s = inet_aton(s)
elif pkt.type in [2,3,4,5]: # NS, MD, MF, CNAME
s = "".join(map(lambda x: chr(len(x))+x, s.split(".")))
if ord(s[-1]):
s += "\x00"
elif pkt.type == 16: # TXT
if s:
ret_s = ""
# The initial string must be splitted into a list of strings
# prepended with theirs sizes.
while len(s) >= 255:
ret_s += "\xff" + s[:255]
s = s[255:]
# The remaining string is less than 255 bytes long
if len(s):
ret_s += struct.pack("!B", len(s)) + s
s = ret_s
elif pkt.type == 28: # AAAA
if s:
s = inet_pton(socket.AF_INET6, s)
return s
def neighsol(addr, src, iface, timeout=1, chainCC=0):
"""
Sends an ICMPv6 Neighbor Solicitation message to get the MAC address
of the neighbor with specified IPv6 address addr. 'src' address is
used as source of the message. Message is sent on iface. By default,
timeout waiting for an answer is 1 second.
If no answer is gathered, None is returned. Else, the answer is
returned (ethernet frame).
"""
nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
d = inet_ntop(socket.AF_INET6, nsma)
dm = in6_getnsmac(nsma)
p = Ether(dst=dm)/IPv6(dst=d, src=src, hlim=255)
p /= ICMPv6ND_NS(tgt=addr)
p /= ICMPv6NDOptSrcLLAddr(lladdr=get_if_hwaddr(iface))
res = srp1(p,type=ETH_P_IPV6, iface=iface, timeout=1, verbose=0,
chainCC=chainCC)
return res
def getfield(self, pkt, s):
c = l = None
if self.length_from is not None:
l = self.length_from(pkt)
elif self.count_from is not None:
c = self.count_from(pkt)
lst = []
ret = ""
remain = s
if l is not None:
remain,ret = s[:l],s[l:]
while remain:
if c is not None:
if c <= 0:
break
c -= 1
addr = inet_ntop(socket.AF_INET6, remain[:16])
lst.append(addr)
remain = remain[16:]
return remain+ret,lst
def i2m(self, pkt, x):
l = pkt.len
if x is None:
x = "::"
if l is None:
l = 1
x = inet_pton(socket.AF_INET6, x)
if l is None:
return x
if l in [0, 1]:
return ""
if l in [2, 3]:
return x[:8*(l-1)]
return x + '\x00'*8*(l-3)
def h2i(self, pkt, x):
if x is tuple and type(x[0]) is int:
return x
val = None
try: # Try IPv6
inet_pton(socket.AF_INET6, x)
val = (0, x)
except:
try: # Try IPv4
inet_pton(socket.AF_INET, x)
val = (2, x)
except: # Try DNS
if x is None:
x = ""
x = names2dnsrepr(x)
val = (1, x)
return val
def _resolve_one(self, ip):
"""
overloaded version to provide a Whois resolution on the
embedded IPv4 address if the address is 6to4 or Teredo.
Otherwise, the native IPv6 address is passed.
"""
if in6_isaddr6to4(ip): # for 6to4, use embedded @
tmp = inet_pton(socket.AF_INET6, ip)
addr = inet_ntop(socket.AF_INET, tmp[2:6])
elif in6_isaddrTeredo(ip): # for Teredo, use mapped address
addr = teredoAddrExtractInfo(ip)[2]
else:
addr = ip
_, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)
return ip,asn,desc
def hashret(self): # we filter on peer address field
return inet_pton(socket.AF_INET6, self.peeraddr)
#####################################################################
# sent between Relay Agents and Servers
# Normalement, doit inclure une option "Relay Message Option"
# peut en inclure d'autres.
# Les valeurs des champs hop-count, link-addr et peer-addr
# sont copiees du messsage Forward associe. POur le suivi de session.
# Pour le moment, comme decrit dans le commentaire, le hashret
# se limite au contenu du champ peer address.
# Voir section 7.2 de la 3315.
# Relay-Reply Message
# - sent by servers to relay agents
# - if the solicit message was received in a Relay-Forward message,
# the server constructs a relay-reply message with the Advertise
# message in the payload of a relay-message. cf page 37/101. Envoie de
# ce message en unicast au relay-agent. utilisation de l'adresse ip
# presente en ip source du paquet recu
def ifchange(self, iff, addr):
the_addr, the_plen = (addr.split("/")+["128"])[:2]
the_plen = int(the_plen)
naddr = inet_pton(socket.AF_INET6, the_addr)
nmask = in6_cidr2mask(the_plen)
the_net = inet_ntop(socket.AF_INET6, in6_and(nmask,naddr))
for i in range(len(self.routes)):
net,plen,gw,iface,addr = self.routes[i]
if iface != iff:
continue
if gw == '::':
self.routes[i] = (the_net,the_plen,gw,iface,the_addr)
else:
self.routes[i] = (net,the_plen,gw,iface,the_addr)
self.invalidate_cache()
ip6_neigh_cache.flush()
def inet_pton(family, text):
"""Convert the textual form of a network address into its binary form.
@param family: the address family
@type family: int
@param text: the textual address
@type text: string
@raises NotImplementedError: the address family specified is not
implemented.
@rtype: string
"""
if family == AF_INET:
return dns.ipv4.inet_aton(text)
elif family == AF_INET6:
return dns.ipv6.inet_aton(text)
else:
raise NotImplementedError
def inet_ntop(family, address):
"""Convert the binary form of a network address into its textual form.
@param family: the address family
@type family: int
@param address: the binary address
@type address: string
@raises NotImplementedError: the address family specified is not
implemented.
@rtype: string
"""
if family == AF_INET:
return dns.ipv4.inet_ntoa(address)
elif family == AF_INET6:
return dns.ipv6.inet_ntoa(address)
else:
raise NotImplementedError
def af_for_address(text):
"""Determine the address family of a textual-form network address.
@param text: the textual address
@type text: string
@raises ValueError: the address family cannot be determined from the input.
@rtype: int
"""
try:
junk = dns.ipv4.inet_aton(text)
return AF_INET
except:
try:
junk = dns.ipv6.inet_aton(text)
return AF_INET6
except:
raise ValueError
def _gethostbyaddr(ip):
try:
addr = dns.ipv6.inet_aton(ip)
sockaddr = (ip, 80, 0, 0)
family = socket.AF_INET6
except:
sockaddr = (ip, 80)
family = socket.AF_INET
(name, port) = _getnameinfo(sockaddr, socket.NI_NAMEREQD)
aliases = []
addresses = []
tuples = _getaddrinfo(name, 0, family, socket.SOCK_STREAM, socket.SOL_TCP,
socket.AI_CANONNAME)
canonical = tuples[0][3]
for item in tuples:
addresses.append(item[4][0])
# XXX we just ignore aliases
return (canonical, aliases, addresses)
def _read_socks5_address(self):
atyp = yield self.read_bytes(1)
if atyp == b"\x01":
data = yield self.read_bytes(4)
addr = socket.inet_ntoa(data)
elif atyp == b"\x03":
length = yield self.read_bytes(1)
addr = yield self.read_bytes(length)
elif atyp == b"\x04":
data = yield self.read_bytes(16)
addr = socket.inet_ntop(socket.AF_INET6, data)
else:
raise GeneralProxyError("SOCKS5 proxy server sent invalid data")
data = yield self.read_bytes(2)
port = struct.unpack(">H", data)[0]
raise gen.Return((addr, port))
def select_ip_version(host, port):
"""Returns AF_INET4 or AF_INET6 depending on where to connect to."""
# disabled due to problems with current ipv6 implementations
# and various operating systems. Probably this code also is
# not supposed to work, but I can't come up with any other
# ways to implement this.
# try:
# info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
# socket.SOCK_STREAM, 0,
# socket.AI_PASSIVE)
# if info:
# return info[0][0]
# except socket.gaierror:
# pass
if ':' in host and hasattr(socket, 'AF_INET6'):
return socket.AF_INET6
return socket.AF_INET
def select_ip_version(host, port):
"""Returns AF_INET4 or AF_INET6 depending on where to connect to."""
# disabled due to problems with current ipv6 implementations
# and various operating systems. Probably this code also is
# not supposed to work, but I can't come up with any other
# ways to implement this.
# try:
# info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
# socket.SOCK_STREAM, 0,
# socket.AI_PASSIVE)
# if info:
# return info[0][0]
# except socket.gaierror:
# pass
if ':' in host and hasattr(socket, 'AF_INET6'):
return socket.AF_INET6
return socket.AF_INET
def test_sock_connect_address(self):
# In debug mode, sock_connect() must ensure that the address is already
# resolved (call _check_resolved_address())
self.loop.set_debug(True)
addresses = [(socket.AF_INET, ('www.python.org', 80))]
if support.IPV6_ENABLED:
addresses.extend((
(socket.AF_INET6, ('www.python.org', 80)),
(socket.AF_INET6, ('www.python.org', 80, 0, 0)),
))
for family, address in addresses:
for sock_type in (socket.SOCK_STREAM, socket.SOCK_DGRAM):
sock = socket.socket(family, sock_type)
with sock:
sock.setblocking(False)
connect = self.loop.sock_connect(sock, address)
with self.assertRaises(ValueError) as cm:
self.loop.run_until_complete(connect)
self.assertIn('address must be resolved',
str(cm.exception))
def select_ip_version(host, port):
"""Returns AF_INET4 or AF_INET6 depending on where to connect to."""
# disabled due to problems with current ipv6 implementations
# and various operating systems. Probably this code also is
# not supposed to work, but I can't come up with any other
# ways to implement this.
# try:
# info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
# socket.SOCK_STREAM, 0,
# socket.AI_PASSIVE)
# if info:
# return info[0][0]
# except socket.gaierror:
# pass
if ':' in host and hasattr(socket, 'AF_INET6'):
return socket.AF_INET6
return socket.AF_INET
def select_ip_version(host, port):
"""Returns AF_INET4 or AF_INET6 depending on where to connect to."""
# disabled due to problems with current ipv6 implementations
# and various operating systems. Probably this code also is
# not supposed to work, but I can't come up with any other
# ways to implement this.
# try:
# info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
# socket.SOCK_STREAM, 0,
# socket.AI_PASSIVE)
# if info:
# return info[0][0]
# except socket.gaierror:
# pass
if ':' in host and hasattr(socket, 'AF_INET6'):
return socket.AF_INET6
return socket.AF_INET
def makesock(self):
if self.config["ipv6"]:
family = socket.AF_INET6
else:
family = socket.AF_INET
if self.proxy is not None:
proxy_serv, proxy_port = self.proxy.split(":")
self.sock = socks.socket(family, socket.SOCK_STREAM)
self.sock.setblocking(0)
self.sock.setproxy(socks.PROXY_TYPE_SOCKS5, proxy_serv, proxy_port)
else:
self.sock = socket.socket(family, socket.SOCK_STREAM)
self.sock.settimeout(self.timeout)
if self.vhost is not None:
self.sock.bind((self.vhost, 0))
if self.config["ssl"]:
self.sock = ssl.wrap_socket(self.sock)