python类fromfd()的实例源码

serving.py 文件源码 项目:RPoint 作者: george17-meet 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, host, port, app, handler=None,
                 passthrough_errors=False, ssl_context=None, fd=None):
        if handler is None:
            handler = WSGIRequestHandler

        self.address_family = select_ip_version(host, port)

        if fd is not None:
            real_sock = socket.fromfd(fd, self.address_family,
                                      socket.SOCK_STREAM)
            port = 0
        HTTPServer.__init__(self, (host, int(port)), handler)
        self.app = app
        self.passthrough_errors = passthrough_errors
        self.shutdown_signal = False
        self.host = host
        self.port = self.socket.getsockname()[1]

        # Patch in the original socket.
        if fd is not None:
            self.socket.close()
            self.socket = real_sock
            self.server_address = self.socket.getsockname()

        if ssl_context is not None:
            if isinstance(ssl_context, tuple):
                ssl_context = load_ssl_context(*ssl_context)
            if ssl_context == 'adhoc':
                ssl_context = generate_adhoc_ssl_context()
            # If we are on Python 2 the return value from socket.fromfd
            # is an internal socket object but what we need for ssl wrap
            # is the wrapper around it :(
            sock = self.socket
            if PY2 and not isinstance(sock, socket.socket):
                sock = socket.socket(sock.family, sock.type, sock.proto, sock)
            self.socket = ssl_context.wrap_socket(sock, server_side=True)
            self.ssl_context = ssl_context
        else:
            self.ssl_context = None
serving.py 文件源码 项目:isni-reconcile 作者: cmh2166 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, host, port, app, handler=None,
                 passthrough_errors=False, ssl_context=None, fd=None):
        if handler is None:
            handler = WSGIRequestHandler

        self.address_family = select_ip_version(host, port)

        if fd is not None:
            real_sock = socket.fromfd(fd, self.address_family,
                                      socket.SOCK_STREAM)
            port = 0
        HTTPServer.__init__(self, (host, int(port)), handler)
        self.app = app
        self.passthrough_errors = passthrough_errors
        self.shutdown_signal = False
        self.host = host
        self.port = port

        # Patch in the original socket.
        if fd is not None:
            self.socket.close()
            self.socket = real_sock
            self.server_address = self.socket.getsockname()

        if ssl_context is not None:
            if isinstance(ssl_context, tuple):
                ssl_context = load_ssl_context(*ssl_context)
            if ssl_context == 'adhoc':
                ssl_context = generate_adhoc_ssl_context()
            # If we are on Python 2 the return value from socket.fromfd
            # is an internal socket object but what we need for ssl wrap
            # is the wrapper around it :(
            sock = self.socket
            if PY2 and not isinstance(sock, socket.socket):
                sock = socket.socket(sock.family, sock.type, sock.proto, sock)
            self.socket = ssl_context.wrap_socket(sock, server_side=True)
            self.ssl_context = ssl_context
        else:
            self.ssl_context = None
sock.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, address, conf, log, fd=None):
        self.log = log
        self.conf = conf

        self.cfg_addr = address
        if fd is None:
            sock = socket.socket(self.FAMILY, socket.SOCK_STREAM)
        else:
            sock = socket.fromfd(fd, self.FAMILY, socket.SOCK_STREAM)

        self.sock = self.set_options(sock, bound=(fd is not None))
daemon.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def is_socket(fd):
    """ Determine whether the file descriptor is a socket.

        :param fd: The file descriptor to interrogate.
        :return: ``True`` iff the file descriptor is a socket; otherwise
            ``False``.

        Query the socket type of `fd`. If there is no error, the file is a
        socket.

        """
    result = False

    file_socket = socket.fromfd(fd, socket.AF_INET, socket.SOCK_RAW)

    try:
        socket_type = file_socket.getsockopt(
            socket.SOL_SOCKET, socket.SO_TYPE)
    except socket.error as exc:
        exc_errno = exc.args[0]
        if exc_errno == errno.ENOTSOCK:
            # Socket operation on non-socket.
            pass
        else:
            # Some other socket error.
            result = True
    else:
        # No error getting socket type.
        result = True

    return result
_socket.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def fromfd(*args, **kwargs):
    """Like :func:`socket.fromfd`, but returns a trio socket object.

    """
    return from_stdlib_socket(_stdlib_socket.fromfd(*args, **kwargs))
test_socket.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_from_fd():
    sa, sb = stdlib_socket.socketpair()
    ta = tsocket.fromfd(sa.fileno(), sa.family, sa.type, sa.proto)
    with sa, sb, ta:
        assert ta.fileno() != sa.fileno()
        await ta.send(b"x")
        assert sb.recv(3) == b"x"
serving.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, host, port, app, handler=None,
                 passthrough_errors=False, ssl_context=None, fd=None):
        if handler is None:
            handler = WSGIRequestHandler

        self.address_family = select_ip_version(host, port)

        if fd is not None:
            real_sock = socket.fromfd(fd, self.address_family,
                                      socket.SOCK_STREAM)
            port = 0
        HTTPServer.__init__(self, (host, int(port)), handler)
        self.app = app
        self.passthrough_errors = passthrough_errors
        self.shutdown_signal = False
        self.host = host
        self.port = port

        # Patch in the original socket.
        if fd is not None:
            self.socket.close()
            self.socket = real_sock
            self.server_address = self.socket.getsockname()

        if ssl_context is not None:
            if isinstance(ssl_context, tuple):
                ssl_context = load_ssl_context(*ssl_context)
            if ssl_context == 'adhoc':
                ssl_context = generate_adhoc_ssl_context()
            # If we are on Python 2 the return value from socket.fromfd
            # is an internal socket object but what we need for ssl wrap
            # is the wrapper around it :(
            sock = self.socket
            if PY2 and not isinstance(sock, socket.socket):
                sock = socket.socket(sock.family, sock.type, sock.proto, sock)
            self.socket = ssl_context.wrap_socket(sock, server_side=True)
            self.ssl_context = ssl_context
        else:
            self.ssl_context = None
test_e2e.py 文件源码 项目:pyh2o 作者: iceb0y 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_peername(self):
        response = self.http_get(PEERNAME_PATH)
        sock = socket.fromfd(response.fileno(), socket.AF_INET, socket.SOCK_STREAM)
        sockname = sock.getsockname()
        self.assertEqual(response.read(),
                         (sockname[0] + ':' + str(sockname[1])).encode())
        sock.close()
serving.py 文件源码 项目:Indushell 作者: SecarmaLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, host, port, app, handler=None,
                 passthrough_errors=False, ssl_context=None, fd=None):
        if handler is None:
            handler = WSGIRequestHandler

        self.address_family = select_ip_version(host, port)

        if fd is not None:
            real_sock = socket.fromfd(fd, self.address_family,
                                      socket.SOCK_STREAM)
            port = 0
        HTTPServer.__init__(self, (host, int(port)), handler)
        self.app = app
        self.passthrough_errors = passthrough_errors
        self.shutdown_signal = False
        self.host = host
        self.port = self.socket.getsockname()[1]

        # Patch in the original socket.
        if fd is not None:
            self.socket.close()
            self.socket = real_sock
            self.server_address = self.socket.getsockname()

        if ssl_context is not None:
            if isinstance(ssl_context, tuple):
                ssl_context = load_ssl_context(*ssl_context)
            if ssl_context == 'adhoc':
                ssl_context = generate_adhoc_ssl_context()
            # If we are on Python 2 the return value from socket.fromfd
            # is an internal socket object but what we need for ssl wrap
            # is the wrapper around it :(
            sock = self.socket
            if PY2 and not isinstance(sock, socket.socket):
                sock = socket.socket(sock.family, sock.type, sock.proto, sock)
            self.socket = ssl_context.wrap_socket(sock, server_side=True)
            self.ssl_context = ssl_context
        else:
            self.ssl_context = None
serving.py 文件源码 项目:Liljimbo-Chatbot 作者: chrisjim316 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, host, port, app, handler=None,
                 passthrough_errors=False, ssl_context=None, fd=None):
        if handler is None:
            handler = WSGIRequestHandler

        self.address_family = select_ip_version(host, port)

        if fd is not None:
            real_sock = socket.fromfd(fd, self.address_family,
                                      socket.SOCK_STREAM)
            port = 0
        HTTPServer.__init__(self, (host, int(port)), handler)
        self.app = app
        self.passthrough_errors = passthrough_errors
        self.shutdown_signal = False
        self.host = host
        self.port = self.socket.getsockname()[1]

        # Patch in the original socket.
        if fd is not None:
            self.socket.close()
            self.socket = real_sock
            self.server_address = self.socket.getsockname()

        if ssl_context is not None:
            if isinstance(ssl_context, tuple):
                ssl_context = load_ssl_context(*ssl_context)
            if ssl_context == 'adhoc':
                ssl_context = generate_adhoc_ssl_context()
            # If we are on Python 2 the return value from socket.fromfd
            # is an internal socket object but what we need for ssl wrap
            # is the wrapper around it :(
            sock = self.socket
            if PY2 and not isinstance(sock, socket.socket):
                sock = socket.socket(sock.family, sock.type, sock.proto, sock)
            self.socket = ssl_context.wrap_socket(sock, server_side=True)
            self.ssl_context = ssl_context
        else:
            self.ssl_context = None
serving.py 文件源码 项目:flask_system 作者: prashasy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, host, port, app, handler=None,
                 passthrough_errors=False, ssl_context=None, fd=None):
        if handler is None:
            handler = WSGIRequestHandler

        self.address_family = select_ip_version(host, port)

        if fd is not None:
            real_sock = socket.fromfd(fd, self.address_family,
                                      socket.SOCK_STREAM)
            port = 0
        HTTPServer.__init__(self, (host, int(port)), handler)
        self.app = app
        self.passthrough_errors = passthrough_errors
        self.shutdown_signal = False
        self.host = host
        self.port = self.socket.getsockname()[1]

        # Patch in the original socket.
        if fd is not None:
            self.socket.close()
            self.socket = real_sock
            self.server_address = self.socket.getsockname()

        if ssl_context is not None:
            if isinstance(ssl_context, tuple):
                ssl_context = load_ssl_context(*ssl_context)
            if ssl_context == 'adhoc':
                ssl_context = generate_adhoc_ssl_context()
            # If we are on Python 2 the return value from socket.fromfd
            # is an internal socket object but what we need for ssl wrap
            # is the wrapper around it :(
            sock = self.socket
            if PY2 and not isinstance(sock, socket.socket):
                sock = socket.socket(sock.family, sock.type, sock.proto, sock)
            self.socket = ssl_context.wrap_socket(sock, server_side=True)
            self.ssl_context = ssl_context
        else:
            self.ssl_context = None
reduction.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def fromfd(fd, family, type_, proto=0):
    s = socket.fromfd(fd, family, type_, proto)
    if s.__class__ is not socket.socket:
        s = socket.socket(_sock=s)
    return s
reduction.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def rebuild_socket(reduced_handle, family, type_, proto):
    fd = rebuild_handle(reduced_handle)
    _sock = fromfd(fd, family, type_, proto)
    close(fd)
    return _sock
test_socket.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def testFromFd(self):
        # Testing fromfd()
        fd = self.cli_conn.fileno()
        sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
        self.addCleanup(sock.close)
        self.assertIsInstance(sock, socket.socket)
        msg = sock.recv(1024)
        self.assertEqual(msg, MSG)
sock.py 文件源码 项目:chihu 作者: yelongyu 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, address, conf, log, fd=None):
        self.log = log
        self.conf = conf

        self.cfg_addr = address
        if fd is None:
            sock = socket.socket(self.FAMILY, socket.SOCK_STREAM)
        else:
            sock = socket.fromfd(fd, self.FAMILY, socket.SOCK_STREAM)

        self.sock = self.set_options(sock, bound=(fd is not None))
serving.py 文件源码 项目:chihu 作者: yelongyu 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, host, port, app, handler=None,
                 passthrough_errors=False, ssl_context=None, fd=None):
        if handler is None:
            handler = WSGIRequestHandler

        self.address_family = select_ip_version(host, port)

        if fd is not None:
            real_sock = socket.fromfd(fd, self.address_family,
                                      socket.SOCK_STREAM)
            port = 0
        HTTPServer.__init__(self, (host, int(port)), handler)
        self.app = app
        self.passthrough_errors = passthrough_errors
        self.shutdown_signal = False
        self.host = host
        self.port = port

        # Patch in the original socket.
        if fd is not None:
            self.socket.close()
            self.socket = real_sock
            self.server_address = self.socket.getsockname()

        if ssl_context is not None:
            if isinstance(ssl_context, tuple):
                ssl_context = load_ssl_context(*ssl_context)
            if ssl_context == 'adhoc':
                ssl_context = generate_adhoc_ssl_context()
            # If we are on Python 2 the return value from socket.fromfd
            # is an internal socket object but what we need for ssl wrap
            # is the wrapper around it :(
            sock = self.socket
            if PY2 and not isinstance(sock, socket.socket):
                sock = socket.socket(sock.family, sock.type, sock.proto, sock)
            self.socket = ssl_context.wrap_socket(sock, server_side=True)
            self.ssl_context = ssl_context
        else:
            self.ssl_context = None
sock.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, address, conf, log, fd=None):
        self.log = log
        self.conf = conf

        self.cfg_addr = address
        if fd is None:
            sock = socket.socket(self.FAMILY, socket.SOCK_STREAM)
            bound = False
        else:
            sock = socket.fromfd(fd, self.FAMILY, socket.SOCK_STREAM)
            os.close(fd)
            bound = True

        self.sock = self.set_options(sock, bound=bound)
serving.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, host, port, app, handler=None,
                 passthrough_errors=False, ssl_context=None, fd=None):
        if handler is None:
            handler = WSGIRequestHandler

        self.address_family = select_ip_version(host, port)

        if fd is not None:
            real_sock = socket.fromfd(fd, self.address_family,
                                      socket.SOCK_STREAM)
            port = 0
        HTTPServer.__init__(self, (host, int(port)), handler)
        self.app = app
        self.passthrough_errors = passthrough_errors
        self.shutdown_signal = False
        self.host = host
        self.port = self.socket.getsockname()[1]

        # Patch in the original socket.
        if fd is not None:
            self.socket.close()
            self.socket = real_sock
            self.server_address = self.socket.getsockname()

        if ssl_context is not None:
            if isinstance(ssl_context, tuple):
                ssl_context = load_ssl_context(*ssl_context)
            if ssl_context == 'adhoc':
                ssl_context = generate_adhoc_ssl_context()
            # If we are on Python 2 the return value from socket.fromfd
            # is an internal socket object but what we need for ssl wrap
            # is the wrapper around it :(
            sock = self.socket
            if PY2 and not isinstance(sock, socket.socket):
                sock = socket.socket(sock.family, sock.type, sock.proto, sock)
            self.socket = ssl_context.wrap_socket(sock, server_side=True)
            self.ssl_context = ssl_context
        else:
            self.ssl_context = None
serving.py 文件源码 项目:pyetje 作者: rorlika 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def __init__(self, host, port, app, handler=None,
                 passthrough_errors=False, ssl_context=None, fd=None):
        if handler is None:
            handler = WSGIRequestHandler

        self.address_family = select_ip_version(host, port)

        if fd is not None:
            real_sock = socket.fromfd(fd, self.address_family,
                                      socket.SOCK_STREAM)
            port = 0
        HTTPServer.__init__(self, (host, int(port)), handler)
        self.app = app
        self.passthrough_errors = passthrough_errors
        self.shutdown_signal = False
        self.host = host
        self.port = port

        # Patch in the original socket.
        if fd is not None:
            self.socket.close()
            self.socket = real_sock
            self.server_address = self.socket.getsockname()

        if ssl_context is not None:
            if isinstance(ssl_context, tuple):
                ssl_context = load_ssl_context(*ssl_context)
            if ssl_context == 'adhoc':
                ssl_context = generate_adhoc_ssl_context()
            # If we are on Python 2 the return value from socket.fromfd
            # is an internal socket object but what we need for ssl wrap
            # is the wrapper around it :(
            sock = self.socket
            if PY2 and not isinstance(sock, socket.socket):
                sock = socket.socket(sock.family, sock.type, sock.proto, sock)
            self.socket = ssl_context.wrap_socket(sock, server_side=True)
            self.ssl_context = ssl_context
        else:
            self.ssl_context = None
test_process.py 文件源码 项目:respeaker_virtualenv 作者: respeaker 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_connection_fromfd(self):
        with contextlib.closing(socket.socket()) as sock:
            sock.bind(('localhost', 0))
            sock.listen(1)
            p = psutil.Process()
            for conn in p.connections():
                if conn.fd == sock.fileno():
                    break
            else:
                self.fail("couldn't find socket fd")
            dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
            with contextlib.closing(dupsock):
                self.assertEqual(dupsock.getsockname(), conn.laddr)
                self.assertNotEqual(sock.fileno(), dupsock.fileno())


问题


面经


文章

微信
公众号

扫码关注公众号