python类getaddrinfo()的实例源码

resolver.py 文件源码 项目:spiderfoot 作者: wi-fi-analyzer 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def override_system_resolver(resolver=None):
    """Override the system resolver routines in the socket module with
    versions which use dnspython's resolver.

    This can be useful in testing situations where you want to control
    the resolution behavior of python code without having to change
    the system's resolver settings (e.g. /etc/resolv.conf).

    The resolver to use may be specified; if it's not, the default
    resolver will be used.

    @param resolver: the resolver to use
    @type resolver: dns.resolver.Resolver object or None
    """
    if resolver is None:
        resolver = get_default_resolver()
    global _resolver
    _resolver = resolver
    socket.getaddrinfo = _getaddrinfo
    socket.getnameinfo = _getnameinfo
    socket.getfqdn = _getfqdn
    socket.gethostbyname = _gethostbyname
    socket.gethostbyname_ex = _gethostbyname_ex
    socket.gethostbyaddr = _gethostbyaddr
sploit.py 文件源码 项目:CVE-2016-6366 作者: RiskSense-Ops 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create(self):
        try:
            msg = "getaddrinfo returns an empty list"
            for res in socket.getaddrinfo(self.target_ip, self.target_port, 0, socket.SOCK_DGRAM):
                af, socktype, proto, canonname, sa = res
                try:
                    sock = socket.socket(af, socktype, proto)
                    sock.settimeout(self.timeout)
                    sock.connect(sa)
                except socket.error, msg:
                    if sock:
                        sock.close()
                        sock = None
                        continue
                    break
                if not sock:
                    raise socket.error, msg
        except socket.error:
            raise RuntimeError,'[+] Cannot connect to %s:%d\n[+] port might not be up' % (self.target_ip, self.target_port)
        return sock
serving.py 文件源码 项目:arithmancer 作者: google 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def select_ip_version(host, port):
    """Returns AF_INET4 or AF_INET6 depending on where to connect to."""
    # disabled due to problems with current ipv6 implementations
    # and various operating systems.  Probably this code also is
    # not supposed to work, but I can't come up with any other
    # ways to implement this.
    ##try:
    ##    info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
    ##                              socket.SOCK_STREAM, 0,
    ##                              socket.AI_PASSIVE)
    ##    if info:
    ##        return info[0][0]
    ##except socket.gaierror:
    ##    pass
    if ':' in host and hasattr(socket, 'AF_INET6'):
        return socket.AF_INET6
    return socket.AF_INET
nmb.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _setup_connection(self, dstaddr, timeout=None):
        port = randint(10000, 60000)
        af, socktype, proto, _canonname, _sa = socket.getaddrinfo(dstaddr, port, socket.AF_INET, socket.SOCK_DGRAM)[0]
        s = socket.socket(af, socktype, proto)
        has_bind = 1
        for _i in range(0, 10):
            # We try to bind to a port for 10 tries
            try:
                s.bind((INADDR_ANY, randint(10000, 60000)))
                s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
                has_bind = 1
            except socket.error:
                pass
        if not has_bind:
            raise NetBIOSError, ('Cannot bind to a good UDP port', ERRCLASS_OS, errno.EAGAIN)
        self.__sock = s
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, node, platform='', cpus=0, memory=0, disk=0):
        if node.find('*') < 0:
            try:
                info = socket.getaddrinfo(node, None)[0]
                ip_addr = info[4][0]
                if info[0] == socket.AF_INET6:
                    ip_addr = re.sub(r'^0*', '', ip_addr)
                    ip_addr = re.sub(r':0*', ':', ip_addr)
                    ip_addr = re.sub(r'::+', '::', ip_addr)
                node = ip_addr
            except:
                node = ''

        if node:
            self.ip_rex = node.replace('.', '\\.').replace('*', '.*')
        else:
            logger.warning('node "%s" is invalid', node)
            self.ip_rex = ''
        self.platform = platform.lower()
        self.cpus = cpus
        self.memory = memory
        self.disk = disk
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, host, tcp_port):
        if re.match(r'^\d+[\.\d]+$', host) or re.match(r'^[0-9a-fA-F:]+$', host):
            self.addr = host
        else:
            self.addr = socket.getaddrinfo(host, 0, 0, socket.SOCK_STREAM)[0][4][0]
        self.port = int(tcp_port)
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, host, tcp_port):
        if re.match(r'^\d+[\.\d]+$', host) or re.match(r'^[0-9a-fA-F:]+$', host):
            self.addr = host
        else:
            self.addr = socket.getaddrinfo(host, 0, 0, socket.SOCK_STREAM)[0][4][0]
        self.port = int(tcp_port)
connection.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 98 收藏 0 点赞 0 评论 0
def allowed_gai_family():
    """This function is designed to work in the context of
    getaddrinfo, where family=socket.AF_UNSPEC is the default and
    will perform a DNS search for both IPv6 and IPv4 records."""

    family = socket.AF_INET
    if HAS_IPV6:
        family = socket.AF_UNSPEC
    return family
connection.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def allowed_gai_family():
    """This function is designed to work in the context of
    getaddrinfo, where family=socket.AF_UNSPEC is the default and
    will perform a DNS search for both IPv6 and IPv4 records."""

    family = socket.AF_INET
    if HAS_IPV6:
        family = socket.AF_UNSPEC
    return family
connection.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def allowed_gai_family():
    """This function is designed to work in the context of
    getaddrinfo, where family=socket.AF_UNSPEC is the default and
    will perform a DNS search for both IPv6 and IPv4 records."""

    family = socket.AF_INET
    if HAS_IPV6:
        family = socket.AF_UNSPEC
    return family
connection.py 文件源码 项目:gimel 作者: Alephbet 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _connect(self):
        "Create a TCP socket connection"
        # we want to mimic what socket.create_connection does to support
        # ipv4/ipv6, but we want to set options prior to calling
        # socket.connect()
        err = None
        for res in socket.getaddrinfo(self.host, self.port, 0,
                                      socket.SOCK_STREAM):
            family, socktype, proto, canonname, socket_address = res
            sock = None
            try:
                sock = socket.socket(family, socktype, proto)
                # TCP_NODELAY
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

                # TCP_KEEPALIVE
                if self.socket_keepalive:
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                    for k, v in iteritems(self.socket_keepalive_options):
                        sock.setsockopt(socket.SOL_TCP, k, v)

                # set the socket_connect_timeout before we connect
                sock.settimeout(self.socket_connect_timeout)

                # connect
                sock.connect(socket_address)

                # set the socket_timeout now that we're connected
                sock.settimeout(self.socket_timeout)
                return sock

            except socket.error as _:
                err = _
                if sock is not None:
                    sock.close()

        if err is not None:
            raise err
        raise socket.error("socket.getaddrinfo returned an empty list")
ftplib.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def makeport(self):
        '''Create a new socket and send a PORT command for it.'''
        msg = "getaddrinfo returns an empty list"
        sock = None
        for res in socket.getaddrinfo(None, 0, self.af, socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
            af, socktype, proto, canonname, sa = res
            try:
                sock = socket.socket(af, socktype, proto)
                sock.bind(sa)
            except socket.error, msg:
                if sock:
                    sock.close()
                sock = None
                continue
            break
        if not sock:
            raise socket.error, msg
        sock.listen(1)
        port = sock.getsockname()[1] # Get proper port
        host = self.sock.getsockname()[0] # Get proper host
        if self.af == socket.AF_INET:
            resp = self.sendport(host, port)
        else:
            resp = self.sendeprt(host, port)
        if self.timeout is not _GLOBAL_DEFAULT_TIMEOUT:
            sock.settimeout(self.timeout)
        return sock
socks.py 文件源码 项目:LFISuite 作者: D35m0nd142 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _write_SOCKS5_address(self, addr, file):
        """
        Return the host and port packed for the SOCKS5 protocol,
        and the resolved address as a tuple object.
        """
        host, port = addr
        proxy_type, _, _, rdns, username, password = self.proxy
        family_to_byte = {socket.AF_INET: b"\x01", socket.AF_INET6: b"\x04"}

        # If the given destination address is an IP address, we'll
        # use the IP address request even if remote resolving was specified.
        # Detect whether the address is IPv4/6 directly.
        for family in (socket.AF_INET, socket.AF_INET6):
            try:
                addr_bytes = socket.inet_pton(family, host)
                file.write(family_to_byte[family] + addr_bytes)
                host = socket.inet_ntop(family, addr_bytes)
                file.write(struct.pack(">H", port))
                return host, port
            except socket.error:
                continue

        # Well it's not an IP number, so it's probably a DNS name.
        if rdns:
            # Resolve remotely
            host_bytes = host.encode("idna")
            file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes)
        else:
            # Resolve locally
            addresses = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_ADDRCONFIG)
            # We can't really work out what IP is reachable, so just pick the
            # first.
            target_addr = addresses[0]
            family = target_addr[0]
            host = target_addr[4][0]

            addr_bytes = socket.inet_pton(family, host)
            file.write(family_to_byte[family] + addr_bytes)
            host = socket.inet_ntop(family, addr_bytes)
        file.write(struct.pack(">H", port))
        return host, port
psdns.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __dns_resolve_host(host, ip_version, timeout):
    """
    Resolve a host using the system's facilities
    """
    family = socket.AF_INET if ip_version == 4 else socket.AF_INET6

    def proc(host, family, queue):
        try:
            queue.put(socket.getaddrinfo(host, 0, family))
        except socket.gaierror as ex:
            # TODO: Would be nice if we could isolate just the not
            # found error.
            queue.put([])
        except socket.timeout:
            # Don't care, we just want the queue to be empty if
            # there's an error.
            pass

    queue = Queue.Queue()
    thread = threading.Thread(target=proc, args=(host, family, queue))
    thread.setDaemon(True)
    thread.start()
    try:
        results = queue.get(True, timeout)
        if len(results) == 0:
            return None
        family, socktype, proto, canonname, sockaddr = results[0]
    except Queue.Empty:
        return None

    # NOTE: Don't make any attempt to kill the thread, as it will get
    # Python all confused if it holds the GIL.

    (ip) = sockaddr
    return str(ip[0])
utils.py 文件源码 项目:deb-python-pyngus 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def connect_socket(host, port, blocking=True):
    """Create a TCP connection to the server."""
    addr = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)
    if not addr:
        raise Exception("Could not translate address '%s:%s'"
                        % (host, str(port)))
    my_socket = socket.socket(addr[0][0], addr[0][1], addr[0][2])
    if not blocking:
        my_socket.setblocking(0)
    try:
        my_socket.connect(addr[0][4])
    except socket.error as e:
        if e.errno != errno.EINPROGRESS:
            raise
    return my_socket
utils.py 文件源码 项目:deb-python-pyngus 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def server_socket(host, port, backlog=10):
    """Create a TCP listening socket for a server."""
    addr = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)
    if not addr:
        raise Exception("Could not translate address '%s:%s'"
                        % (host, str(port)))
    my_socket = socket.socket(addr[0][0], addr[0][1], addr[0][2])
    my_socket.setblocking(0)  # 0=non-blocking
    try:
        my_socket.bind(addr[0][4])
        my_socket.listen(backlog)
    except socket.error as e:
        if e.errno != errno.EINPROGRESS:
            raise
    return my_socket
test_ipv6.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_error_multiple(self):
        if len(socket.getaddrinfo('localhost', 9043, socket.AF_UNSPEC, socket.SOCK_STREAM)) < 2:
            raise unittest.SkipTest('localhost only resolves one address')
        cluster = Cluster(connection_class=self.connection_class, contact_points=['localhost'], port=9043,
                          connect_timeout=10, protocol_version=PROTOCOL_VERSION)
        self.assertRaisesRegexp(NoHostAvailable, '\(\'Unable to connect.*Tried connecting to \[\(.*\(.*\].*Last error',
                                cluster.connect)
connection.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _connect_socket(self):
        sockerr = None
        addresses = socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM)
        if not addresses:
            raise ConnectionException("getaddrinfo returned empty list for %s" % (self.host,))
        for (af, socktype, proto, canonname, sockaddr) in addresses:
            try:
                self._socket = self._socket_impl.socket(af, socktype, proto)
                if self.ssl_options:
                    if not self._ssl_impl:
                        raise RuntimeError("This version of Python was not compiled with SSL support")
                    self._socket = self._ssl_impl.wrap_socket(self._socket, **self.ssl_options)
                self._socket.settimeout(self.connect_timeout)
                self._socket.connect(sockaddr)
                self._socket.settimeout(None)
                if self._check_hostname:
                    ssl.match_hostname(self._socket.getpeercert(), self.host)
                sockerr = None
                break
            except socket.error as err:
                if self._socket:
                    self._socket.close()
                    self._socket = None
                sockerr = err

        if sockerr:
            raise socket.error(sockerr.errno, "Tried connecting to %s. Last error: %s" % ([a[4] for a in addresses], sockerr.strerror or sockerr))

        if self.sockopts:
            for args in self.sockopts:
                self._socket.setsockopt(*args)
policies.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def translate(self, addr):
        """
        Reverse DNS the public broadcast_address, then lookup that hostname to get the AWS-resolved IP, which
        will point to the private IP address within the same datacenter.
        """
        # get family of this address so we translate to the same
        family = socket.getaddrinfo(addr, 0, socket.AF_UNSPEC, socket.SOCK_STREAM)[0][0]
        host = socket.getfqdn(addr)
        for a in socket.getaddrinfo(host, 0, family, socket.SOCK_STREAM):
            try:
                return a[4][0]
            except Exception:
                pass
        return addr


问题


面经


文章

微信
公众号

扫码关注公众号