def override_system_resolver(resolver=None):
"""Override the system resolver routines in the socket module with
versions which use dnspython's resolver.
This can be useful in testing situations where you want to control
the resolution behavior of python code without having to change
the system's resolver settings (e.g. /etc/resolv.conf).
The resolver to use may be specified; if it's not, the default
resolver will be used.
@param resolver: the resolver to use
@type resolver: dns.resolver.Resolver object or None
"""
if resolver is None:
resolver = get_default_resolver()
global _resolver
_resolver = resolver
socket.getaddrinfo = _getaddrinfo
socket.getnameinfo = _getnameinfo
socket.getfqdn = _getfqdn
socket.gethostbyname = _gethostbyname
socket.gethostbyname_ex = _gethostbyname_ex
socket.gethostbyaddr = _gethostbyaddr
python类getaddrinfo()的实例源码
def create(self):
try:
msg = "getaddrinfo returns an empty list"
for res in socket.getaddrinfo(self.target_ip, self.target_port, 0, socket.SOCK_DGRAM):
af, socktype, proto, canonname, sa = res
try:
sock = socket.socket(af, socktype, proto)
sock.settimeout(self.timeout)
sock.connect(sa)
except socket.error, msg:
if sock:
sock.close()
sock = None
continue
break
if not sock:
raise socket.error, msg
except socket.error:
raise RuntimeError,'[+] Cannot connect to %s:%d\n[+] port might not be up' % (self.target_ip, self.target_port)
return sock
def select_ip_version(host, port):
"""Returns AF_INET4 or AF_INET6 depending on where to connect to."""
# disabled due to problems with current ipv6 implementations
# and various operating systems. Probably this code also is
# not supposed to work, but I can't come up with any other
# ways to implement this.
##try:
## info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
## socket.SOCK_STREAM, 0,
## socket.AI_PASSIVE)
## if info:
## return info[0][0]
##except socket.gaierror:
## pass
if ':' in host and hasattr(socket, 'AF_INET6'):
return socket.AF_INET6
return socket.AF_INET
def _setup_connection(self, dstaddr, timeout=None):
port = randint(10000, 60000)
af, socktype, proto, _canonname, _sa = socket.getaddrinfo(dstaddr, port, socket.AF_INET, socket.SOCK_DGRAM)[0]
s = socket.socket(af, socktype, proto)
has_bind = 1
for _i in range(0, 10):
# We try to bind to a port for 10 tries
try:
s.bind((INADDR_ANY, randint(10000, 60000)))
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
has_bind = 1
except socket.error:
pass
if not has_bind:
raise NetBIOSError, ('Cannot bind to a good UDP port', ERRCLASS_OS, errno.EAGAIN)
self.__sock = s
def __init__(self, node, platform='', cpus=0, memory=0, disk=0):
if node.find('*') < 0:
try:
info = socket.getaddrinfo(node, None)[0]
ip_addr = info[4][0]
if info[0] == socket.AF_INET6:
ip_addr = re.sub(r'^0*', '', ip_addr)
ip_addr = re.sub(r':0*', ':', ip_addr)
ip_addr = re.sub(r'::+', '::', ip_addr)
node = ip_addr
except:
node = ''
if node:
self.ip_rex = node.replace('.', '\\.').replace('*', '.*')
else:
logger.warning('node "%s" is invalid', node)
self.ip_rex = ''
self.platform = platform.lower()
self.cpus = cpus
self.memory = memory
self.disk = disk
def __init__(self, host, tcp_port):
if re.match(r'^\d+[\.\d]+$', host) or re.match(r'^[0-9a-fA-F:]+$', host):
self.addr = host
else:
self.addr = socket.getaddrinfo(host, 0, 0, socket.SOCK_STREAM)[0][4][0]
self.port = int(tcp_port)
def __init__(self, host, tcp_port):
if re.match(r'^\d+[\.\d]+$', host) or re.match(r'^[0-9a-fA-F:]+$', host):
self.addr = host
else:
self.addr = socket.getaddrinfo(host, 0, 0, socket.SOCK_STREAM)[0][4][0]
self.port = int(tcp_port)
def allowed_gai_family():
"""This function is designed to work in the context of
getaddrinfo, where family=socket.AF_UNSPEC is the default and
will perform a DNS search for both IPv6 and IPv4 records."""
family = socket.AF_INET
if HAS_IPV6:
family = socket.AF_UNSPEC
return family
def allowed_gai_family():
"""This function is designed to work in the context of
getaddrinfo, where family=socket.AF_UNSPEC is the default and
will perform a DNS search for both IPv6 and IPv4 records."""
family = socket.AF_INET
if HAS_IPV6:
family = socket.AF_UNSPEC
return family
def allowed_gai_family():
"""This function is designed to work in the context of
getaddrinfo, where family=socket.AF_UNSPEC is the default and
will perform a DNS search for both IPv6 and IPv4 records."""
family = socket.AF_INET
if HAS_IPV6:
family = socket.AF_UNSPEC
return family
def _connect(self):
"Create a TCP socket connection"
# we want to mimic what socket.create_connection does to support
# ipv4/ipv6, but we want to set options prior to calling
# socket.connect()
err = None
for res in socket.getaddrinfo(self.host, self.port, 0,
socket.SOCK_STREAM):
family, socktype, proto, canonname, socket_address = res
sock = None
try:
sock = socket.socket(family, socktype, proto)
# TCP_NODELAY
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# TCP_KEEPALIVE
if self.socket_keepalive:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
for k, v in iteritems(self.socket_keepalive_options):
sock.setsockopt(socket.SOL_TCP, k, v)
# set the socket_connect_timeout before we connect
sock.settimeout(self.socket_connect_timeout)
# connect
sock.connect(socket_address)
# set the socket_timeout now that we're connected
sock.settimeout(self.socket_timeout)
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
raise socket.error("socket.getaddrinfo returned an empty list")
def makeport(self):
'''Create a new socket and send a PORT command for it.'''
msg = "getaddrinfo returns an empty list"
sock = None
for res in socket.getaddrinfo(None, 0, self.af, socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
af, socktype, proto, canonname, sa = res
try:
sock = socket.socket(af, socktype, proto)
sock.bind(sa)
except socket.error, msg:
if sock:
sock.close()
sock = None
continue
break
if not sock:
raise socket.error, msg
sock.listen(1)
port = sock.getsockname()[1] # Get proper port
host = self.sock.getsockname()[0] # Get proper host
if self.af == socket.AF_INET:
resp = self.sendport(host, port)
else:
resp = self.sendeprt(host, port)
if self.timeout is not _GLOBAL_DEFAULT_TIMEOUT:
sock.settimeout(self.timeout)
return sock
def _write_SOCKS5_address(self, addr, file):
"""
Return the host and port packed for the SOCKS5 protocol,
and the resolved address as a tuple object.
"""
host, port = addr
proxy_type, _, _, rdns, username, password = self.proxy
family_to_byte = {socket.AF_INET: b"\x01", socket.AF_INET6: b"\x04"}
# If the given destination address is an IP address, we'll
# use the IP address request even if remote resolving was specified.
# Detect whether the address is IPv4/6 directly.
for family in (socket.AF_INET, socket.AF_INET6):
try:
addr_bytes = socket.inet_pton(family, host)
file.write(family_to_byte[family] + addr_bytes)
host = socket.inet_ntop(family, addr_bytes)
file.write(struct.pack(">H", port))
return host, port
except socket.error:
continue
# Well it's not an IP number, so it's probably a DNS name.
if rdns:
# Resolve remotely
host_bytes = host.encode("idna")
file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes)
else:
# Resolve locally
addresses = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_ADDRCONFIG)
# We can't really work out what IP is reachable, so just pick the
# first.
target_addr = addresses[0]
family = target_addr[0]
host = target_addr[4][0]
addr_bytes = socket.inet_pton(family, host)
file.write(family_to_byte[family] + addr_bytes)
host = socket.inet_ntop(family, addr_bytes)
file.write(struct.pack(">H", port))
return host, port
def __dns_resolve_host(host, ip_version, timeout):
"""
Resolve a host using the system's facilities
"""
family = socket.AF_INET if ip_version == 4 else socket.AF_INET6
def proc(host, family, queue):
try:
queue.put(socket.getaddrinfo(host, 0, family))
except socket.gaierror as ex:
# TODO: Would be nice if we could isolate just the not
# found error.
queue.put([])
except socket.timeout:
# Don't care, we just want the queue to be empty if
# there's an error.
pass
queue = Queue.Queue()
thread = threading.Thread(target=proc, args=(host, family, queue))
thread.setDaemon(True)
thread.start()
try:
results = queue.get(True, timeout)
if len(results) == 0:
return None
family, socktype, proto, canonname, sockaddr = results[0]
except Queue.Empty:
return None
# NOTE: Don't make any attempt to kill the thread, as it will get
# Python all confused if it holds the GIL.
(ip) = sockaddr
return str(ip[0])
def connect_socket(host, port, blocking=True):
"""Create a TCP connection to the server."""
addr = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)
if not addr:
raise Exception("Could not translate address '%s:%s'"
% (host, str(port)))
my_socket = socket.socket(addr[0][0], addr[0][1], addr[0][2])
if not blocking:
my_socket.setblocking(0)
try:
my_socket.connect(addr[0][4])
except socket.error as e:
if e.errno != errno.EINPROGRESS:
raise
return my_socket
def server_socket(host, port, backlog=10):
"""Create a TCP listening socket for a server."""
addr = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)
if not addr:
raise Exception("Could not translate address '%s:%s'"
% (host, str(port)))
my_socket = socket.socket(addr[0][0], addr[0][1], addr[0][2])
my_socket.setblocking(0) # 0=non-blocking
try:
my_socket.bind(addr[0][4])
my_socket.listen(backlog)
except socket.error as e:
if e.errno != errno.EINPROGRESS:
raise
return my_socket
def test_error_multiple(self):
if len(socket.getaddrinfo('localhost', 9043, socket.AF_UNSPEC, socket.SOCK_STREAM)) < 2:
raise unittest.SkipTest('localhost only resolves one address')
cluster = Cluster(connection_class=self.connection_class, contact_points=['localhost'], port=9043,
connect_timeout=10, protocol_version=PROTOCOL_VERSION)
self.assertRaisesRegexp(NoHostAvailable, '\(\'Unable to connect.*Tried connecting to \[\(.*\(.*\].*Last error',
cluster.connect)
def _connect_socket(self):
sockerr = None
addresses = socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM)
if not addresses:
raise ConnectionException("getaddrinfo returned empty list for %s" % (self.host,))
for (af, socktype, proto, canonname, sockaddr) in addresses:
try:
self._socket = self._socket_impl.socket(af, socktype, proto)
if self.ssl_options:
if not self._ssl_impl:
raise RuntimeError("This version of Python was not compiled with SSL support")
self._socket = self._ssl_impl.wrap_socket(self._socket, **self.ssl_options)
self._socket.settimeout(self.connect_timeout)
self._socket.connect(sockaddr)
self._socket.settimeout(None)
if self._check_hostname:
ssl.match_hostname(self._socket.getpeercert(), self.host)
sockerr = None
break
except socket.error as err:
if self._socket:
self._socket.close()
self._socket = None
sockerr = err
if sockerr:
raise socket.error(sockerr.errno, "Tried connecting to %s. Last error: %s" % ([a[4] for a in addresses], sockerr.strerror or sockerr))
if self.sockopts:
for args in self.sockopts:
self._socket.setsockopt(*args)
def translate(self, addr):
"""
Reverse DNS the public broadcast_address, then lookup that hostname to get the AWS-resolved IP, which
will point to the private IP address within the same datacenter.
"""
# get family of this address so we translate to the same
family = socket.getaddrinfo(addr, 0, socket.AF_UNSPEC, socket.SOCK_STREAM)[0][0]
host = socket.getfqdn(addr)
for a in socket.getaddrinfo(host, 0, family, socket.SOCK_STREAM):
try:
return a[4][0]
except Exception:
pass
return addr