python类SOL_SOCKET的实例源码

UpnpPunch.py 文件源码 项目:zeronet-debian 作者: bashrc 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _get_local_ip():
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    # not using <broadcast> because gevents getaddrinfo doesn't like that
    # using port 1 as per hobbldygoop's comment about port 0 not working on osx:
    # https://github.com/sirMackk/ZeroNet/commit/fdcd15cf8df0008a2070647d4d28ffedb503fba2#commitcomment-9863928
    s.connect(('239.255.255.250', 1))
    return s.getsockname()[0]
socketrpc.py 文件源码 项目:aiolocust 作者: kpidata 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _listen(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((self.host, self.port))
        sock.listen(256)
        self.slave_index = 0
        slaves = []

        def dispatch_command(cmd):

            _send_obj(slaves[self.slave_index], cmd)
            self.slave_index += 1
            if self.slave_index == len(slaves):
                self.slave_index = 0

        def handle_slave(sock):
            try:
                while True:
                    self.event_queue.put_nowait(_recv_obj(sock))
            except Exception as e:
                logger.info("Slave disconnected")
                slaves.remove(sock)
                if self.slave_index == len(slaves) and len(slaves) > 0:
                    self.slave_index -= 1

                try:
                    sock.close()
                except:
                    pass

        def listener():
            while True:
                _socket, _addr = sock.accept()
                logger.info("Slave connected")
                slaves.append(_socket)
                gevent.spawn(lambda: handle_slave(_socket))

        gevent.spawn(listener)
        return dispatch_command
ports.py 文件源码 项目:dreamr-botnet 作者: YinAndYangSecurityAwareness 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _get_local_ip(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        # not using <broadcast> because gevents getaddrinfo doesn't like that
        # using port 1 as per hobbldygoop's comment about port 0 not working on osx:
        # https://github.com/sirMackk/ZeroNet/commit/fdcd15cf8df0008a2070647d4d28ffedb503fba2#commitcomment-9863928
        s.connect(('239.255.255.250', 1))
        return s.getsockname()[0]
easy_session.py 文件源码 项目:ops_agent 作者: sjqzhang 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, p_socket=None):
        self.socket = p_socket
        if self.socket is not None:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
        self.request_sequnce = 0
        self.buffer = ""
simple_wsgi_gevent.py 文件源码 项目:notebook 作者: archever 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, host, port, app):
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server.bind((host, port))
        self.server.listen()
        self.app = app
        self.base_env = dict(os.environ.items())
upnp.py 文件源码 项目:server 作者: happypandax 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _get_local_ips():
    local_ips = []

    # get local ip using UDP and a  broadcast address
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    # Not using <broadcast> because gevents getaddrinfo doesn't like that
    # using port 1 as per hobbldygoop's comment about port 0 not working on osx:
    # https://github.com/sirMackk/ZeroNet/commit/fdcd15cf8df0008a2070647d4d28ffedb503fba2#commitcomment-9863928
    s.connect(('239.255.255.250', 1))
    local_ips.append(s.getsockname()[0])

    # Get ip by using UDP and a normal address (google dns ip)
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 0))
        local_ips.append(s.getsockname()[0])
    except BaseException:
        pass

    # Get ip by '' hostname . Not supported on all platforms.
    try:
        local_ips += socket.gethostbyname_ex('')[2]
    except BaseException:
        pass

    # Delete duplicates
    local_ips = list(set(local_ips))

    logging.debug("Found local ips: %s" % local_ips)
    return local_ips


问题


面经


文章

微信
公众号

扫码关注公众号