python类AF_INET的实例源码

inet.py 文件源码 项目:llk 作者: Tycx2ry 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def inet_pton(family, text):
    """Convert the textual form of a network address into its binary form.

    @param family: the address family
    @type family: int
    @param text: the textual address
    @type text: string
    @raises NotImplementedError: the address family specified is not
    implemented.
    @rtype: string
    """

    if family == AF_INET:
        return dns.ipv4.inet_aton(text)
    elif family == AF_INET6:
        return dns.ipv6.inet_aton(text)
    else:
        raise NotImplementedError
inet.py 文件源码 项目:llk 作者: Tycx2ry 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def inet_ntop(family, address):
    """Convert the binary form of a network address into its textual form.

    @param family: the address family
    @type family: int
    @param address: the binary address
    @type address: string
    @raises NotImplementedError: the address family specified is not
    implemented.
    @rtype: string
    """
    if family == AF_INET:
        return dns.ipv4.inet_ntoa(address)
    elif family == AF_INET6:
        return dns.ipv6.inet_ntoa(address)
    else:
        raise NotImplementedError
inet.py 文件源码 项目:llk 作者: Tycx2ry 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def af_for_address(text):
    """Determine the address family of a textual-form network address.

    @param text: the textual address
    @type text: string
    @raises ValueError: the address family cannot be determined from the input.
    @rtype: int
    """
    try:
        junk = dns.ipv4.inet_aton(text)
        return AF_INET
    except:
        try:
            junk = dns.ipv6.inet_aton(text)
            return AF_INET6
        except:
            raise ValueError
netpycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def discover_peers(self, port=None):
        """This method can be invoked (periodically?) to broadcast message to
        discover peers, if there is a chance initial broadcast message may be
        lost (as these messages are sent over UDP).
        """
        ping_msg = {'signature': self._signature, 'name': self._name, 'version': __version__}

        def _discover(addrinfo, port, task=None):
            ping_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM))
            ping_sock.settimeout(2)
            if addrinfo.family == socket.AF_INET:
                ping_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
            else:  # addrinfo.family == socket.AF_INET6
                ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS,
                                     struct.pack('@i', 1))
                ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, addrinfo.ifn)
            ping_sock.bind((addrinfo.ip, 0))
            if not port:
                port = addrinfo.udp_sock.getsockname()[1]
            ping_msg['location'] = addrinfo.location
            try:
                yield ping_sock.sendto('ping:'.encode() + serialize(ping_msg),
                                       (addrinfo.broadcast, port))
            except:
                pass
            ping_sock.close()

        for addrinfo in self._addrinfos:
            SysTask(_discover, addrinfo, port)
netpycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def discover_peers(self, port=None):
        """This method can be invoked (periodically?) to broadcast message to
        discover peers, if there is a chance initial broadcast message may be
        lost (as these messages are sent over UDP).
        """
        ping_msg = {'signature': self._signature, 'name': self._name, 'version': __version__}

        def _discover(addrinfo, port, task=None):
            ping_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM))
            ping_sock.settimeout(2)
            if addrinfo.family == socket.AF_INET:
                ping_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
            else:  # addrinfo.family == socket.AF_INET6
                ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS,
                                     struct.pack('@i', 1))
                ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, addrinfo.ifn)
            ping_sock.bind((addrinfo.ip, 0))
            if not port:
                port = addrinfo.udp_sock.getsockname()[1]
            ping_msg['location'] = addrinfo.location
            try:
                yield ping_sock.sendto('ping:'.encode() + serialize(ping_msg),
                                       (addrinfo.broadcast, port))
            except:
                pass
            ping_sock.close()

        for addrinfo in self._addrinfos:
            SysTask(_discover, addrinfo, port)
OSC3.py 文件源码 项目:pyOSC3 作者: Qirky 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _ensureConnected(self, address):
        """Make sure client has a socket connected to address"""
        if not self.socket:
            if len(address) == 4:
                address_family = socket.AF_INET6
            else:
                address_family = socket.AF_INET
            self._setSocket(socket.socket(address_family, socket.SOCK_DGRAM))
        self.socket.connect(address)
OSC3.py 文件源码 项目:pyOSC3 作者: Qirky 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self):
        self._txMutex = threading.Lock()
        OSCAddressSpace.__init__(self)
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.sndbuf_size)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.rcvbuf_size)
        self.socket.settimeout(1.0)
        self._running = False
OSC2.py 文件源码 项目:pyOSC3 作者: Qirky 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def _ensureConnected(self, address):
        """Make sure client has a socket connected to address"""
        if not self.socket:
            if len(address) == 4:
                address_family = socket.AF_INET6
            else:
                address_family = socket.AF_INET
            self._setSocket(socket.socket(address_family, socket.SOCK_DGRAM))
        self.socket.connect(address)
OSC2.py 文件源码 项目:pyOSC3 作者: Qirky 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self):
        self._txMutex = threading.Lock()
        OSCAddressSpace.__init__(self)
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.sndbuf_size)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, self.rcvbuf_size)
        self.socket.settimeout(1.0)
        self._running = False
connection.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def allowed_gai_family():
    """This function is designed to work in the context of
    getaddrinfo, where family=socket.AF_UNSPEC is the default and
    will perform a DNS search for both IPv6 and IPv4 records."""

    family = socket.AF_INET
    if HAS_IPV6:
        family = socket.AF_UNSPEC
    return family
tsproxy.py 文件源码 项目:tsproxy 作者: WPO-Foundation 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, host, port):
    asyncore.dispatcher.__init__(self)
    self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
      #self.set_reuse_addr()
      self.bind((host, port))
      self.listen(socket.SOMAXCONN)
      self.ipaddr, self.port = self.getsockname()
      self.current_client_id = 0
    except:
      PrintMessage("Unable to listen on {0}:{1}. Is the port already in use?".format(host, port))
      exit(1)
connection.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def allowed_gai_family():
    """This function is designed to work in the context of
    getaddrinfo, where family=socket.AF_UNSPEC is the default and
    will perform a DNS search for both IPv6 and IPv4 records."""

    family = socket.AF_INET
    if HAS_IPV6:
        family = socket.AF_UNSPEC
    return family
connection.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def allowed_gai_family():
    """This function is designed to work in the context of
    getaddrinfo, where family=socket.AF_UNSPEC is the default and
    will perform a DNS search for both IPv6 and IPv4 records."""

    family = socket.AF_INET
    if HAS_IPV6:
        family = socket.AF_UNSPEC
    return family
bottle.py 文件源码 项目:dabdabrevolution 作者: harryparkdotio 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def run(self, app):  # pragma: no cover
        from wsgiref.simple_server import make_server
        from wsgiref.simple_server import WSGIRequestHandler, WSGIServer
        import socket

        class FixedHandler(WSGIRequestHandler):
            def address_string(self):  # Prevent reverse DNS lookups please.
                return self.client_address[0]

            def log_request(*args, **kw):
                if not self.quiet:
                    return WSGIRequestHandler.log_request(*args, **kw)

        handler_cls = self.options.get('handler_class', FixedHandler)
        server_cls = self.options.get('server_class', WSGIServer)

        if ':' in self.host:  # Fix wsgiref for IPv6 addresses.
            if getattr(server_cls, 'address_family') == socket.AF_INET:

                class server_cls(server_cls):
                    address_family = socket.AF_INET6

        self.srv = make_server(self.host, self.port, app, server_cls,
                               handler_cls)
        self.port = self.srv.server_port  # update port actual port (0 means random)
        try:
            self.srv.serve_forever()
        except KeyboardInterrupt:
            self.srv.server_close()  # Prevent ResourceWarning: unclosed socket
            raise
tcp_client.py 文件源码 项目:BitBot 作者: crack00r 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _recreate_socket(self):
        if self._proxy is None:
            self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        else:
            import socks
            self._socket = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
            if type(self._proxy) is dict:
                self._socket.set_proxy(**self._proxy)
            else:  # tuple, list, etc.
                self._socket.set_proxy(*self._proxy)
threaded_tcp_client.py 文件源码 项目:BitBot 作者: crack00r 项目源码 文件源码 阅读 61 收藏 0 点赞 0 评论 0
def _recreate_socket(self):
        if self._proxy is None:
            self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        else:
            import socks
            self._socket = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
            if type(self._proxy) is dict:
                self._socket.set_proxy(**self._proxy)
            else:  # tuple, list, etc.
                self._socket.set_proxy(*self._proxy)
ftplib.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def sendeprt(self, host, port):
        '''Send a EPRT command with the current host and the given port number.'''
        af = 0
        if self.af == socket.AF_INET:
            af = 1
        if self.af == socket.AF_INET6:
            af = 2
        if af == 0:
            raise error_proto, 'unsupported address family'
        fields = ['', repr(af), host, repr(port), '']
        cmd = 'EPRT ' + '|'.join(fields)
        return self.voidcmd(cmd)
ftplib.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def makeport(self):
        '''Create a new socket and send a PORT command for it.'''
        msg = "getaddrinfo returns an empty list"
        sock = None
        for res in socket.getaddrinfo(None, 0, self.af, socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
            af, socktype, proto, canonname, sa = res
            try:
                sock = socket.socket(af, socktype, proto)
                sock.bind(sa)
            except socket.error, msg:
                if sock:
                    sock.close()
                sock = None
                continue
            break
        if not sock:
            raise socket.error, msg
        sock.listen(1)
        port = sock.getsockname()[1] # Get proper port
        host = self.sock.getsockname()[0] # Get proper host
        if self.af == socket.AF_INET:
            resp = self.sendport(host, port)
        else:
            resp = self.sendeprt(host, port)
        if self.timeout is not _GLOBAL_DEFAULT_TIMEOUT:
            sock.settimeout(self.timeout)
        return sock
ftplib.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def makepasv(self):
        if self.af == socket.AF_INET:
            host, port = parse227(self.sendcmd('PASV'))
        else:
            host, port = parse229(self.sendcmd('EPSV'), self.sock.getpeername())
        return host, port
socks.py 文件源码 项目:LFISuite 作者: D35m0nd142 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def _write_SOCKS5_address(self, addr, file):
        """
        Return the host and port packed for the SOCKS5 protocol,
        and the resolved address as a tuple object.
        """
        host, port = addr
        proxy_type, _, _, rdns, username, password = self.proxy
        family_to_byte = {socket.AF_INET: b"\x01", socket.AF_INET6: b"\x04"}

        # If the given destination address is an IP address, we'll
        # use the IP address request even if remote resolving was specified.
        # Detect whether the address is IPv4/6 directly.
        for family in (socket.AF_INET, socket.AF_INET6):
            try:
                addr_bytes = socket.inet_pton(family, host)
                file.write(family_to_byte[family] + addr_bytes)
                host = socket.inet_ntop(family, addr_bytes)
                file.write(struct.pack(">H", port))
                return host, port
            except socket.error:
                continue

        # Well it's not an IP number, so it's probably a DNS name.
        if rdns:
            # Resolve remotely
            host_bytes = host.encode("idna")
            file.write(b"\x03" + chr(len(host_bytes)).encode() + host_bytes)
        else:
            # Resolve locally
            addresses = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_ADDRCONFIG)
            # We can't really work out what IP is reachable, so just pick the
            # first.
            target_addr = addresses[0]
            family = target_addr[0]
            host = target_addr[4][0]

            addr_bytes = socket.inet_pton(family, host)
            file.write(family_to_byte[family] + addr_bytes)
            host = socket.inet_ntop(family, addr_bytes)
        file.write(struct.pack(">H", port))
        return host, port


问题


面经


文章

微信
公众号

扫码关注公众号