def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
if fd is not None:
real_sock = socket.fromfd(fd, self.address_family,
socket.SOCK_STREAM)
port = 0
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
self.host = host
self.port = self.socket.getsockname()[1]
# Patch in the original socket.
if fd is not None:
self.socket.close()
self.socket = real_sock
self.server_address = self.socket.getsockname()
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
# If we are on Python 2 the return value from socket.fromfd
# is an internal socket object but what we need for ssl wrap
# is the wrapper around it :(
sock = self.socket
if PY2 and not isinstance(sock, socket.socket):
sock = socket.socket(sock.family, sock.type, sock.proto, sock)
self.socket = ssl_context.wrap_socket(sock, server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
python类fromfd()的实例源码
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
if fd is not None:
real_sock = socket.fromfd(fd, self.address_family,
socket.SOCK_STREAM)
port = 0
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
self.host = host
self.port = port
# Patch in the original socket.
if fd is not None:
self.socket.close()
self.socket = real_sock
self.server_address = self.socket.getsockname()
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
# If we are on Python 2 the return value from socket.fromfd
# is an internal socket object but what we need for ssl wrap
# is the wrapper around it :(
sock = self.socket
if PY2 and not isinstance(sock, socket.socket):
sock = socket.socket(sock.family, sock.type, sock.proto, sock)
self.socket = ssl_context.wrap_socket(sock, server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, address, conf, log, fd=None):
self.log = log
self.conf = conf
self.cfg_addr = address
if fd is None:
sock = socket.socket(self.FAMILY, socket.SOCK_STREAM)
else:
sock = socket.fromfd(fd, self.FAMILY, socket.SOCK_STREAM)
self.sock = self.set_options(sock, bound=(fd is not None))
def is_socket(fd):
""" Determine whether the file descriptor is a socket.
:param fd: The file descriptor to interrogate.
:return: ``True`` iff the file descriptor is a socket; otherwise
``False``.
Query the socket type of `fd`. If there is no error, the file is a
socket.
"""
result = False
file_socket = socket.fromfd(fd, socket.AF_INET, socket.SOCK_RAW)
try:
socket_type = file_socket.getsockopt(
socket.SOL_SOCKET, socket.SO_TYPE)
except socket.error as exc:
exc_errno = exc.args[0]
if exc_errno == errno.ENOTSOCK:
# Socket operation on non-socket.
pass
else:
# Some other socket error.
result = True
else:
# No error getting socket type.
result = True
return result
def fromfd(*args, **kwargs):
"""Like :func:`socket.fromfd`, but returns a trio socket object.
"""
return from_stdlib_socket(_stdlib_socket.fromfd(*args, **kwargs))
def test_from_fd():
sa, sb = stdlib_socket.socketpair()
ta = tsocket.fromfd(sa.fileno(), sa.family, sa.type, sa.proto)
with sa, sb, ta:
assert ta.fileno() != sa.fileno()
await ta.send(b"x")
assert sb.recv(3) == b"x"
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
if fd is not None:
real_sock = socket.fromfd(fd, self.address_family,
socket.SOCK_STREAM)
port = 0
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
self.host = host
self.port = port
# Patch in the original socket.
if fd is not None:
self.socket.close()
self.socket = real_sock
self.server_address = self.socket.getsockname()
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
# If we are on Python 2 the return value from socket.fromfd
# is an internal socket object but what we need for ssl wrap
# is the wrapper around it :(
sock = self.socket
if PY2 and not isinstance(sock, socket.socket):
sock = socket.socket(sock.family, sock.type, sock.proto, sock)
self.socket = ssl_context.wrap_socket(sock, server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def test_peername(self):
response = self.http_get(PEERNAME_PATH)
sock = socket.fromfd(response.fileno(), socket.AF_INET, socket.SOCK_STREAM)
sockname = sock.getsockname()
self.assertEqual(response.read(),
(sockname[0] + ':' + str(sockname[1])).encode())
sock.close()
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
if fd is not None:
real_sock = socket.fromfd(fd, self.address_family,
socket.SOCK_STREAM)
port = 0
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
self.host = host
self.port = self.socket.getsockname()[1]
# Patch in the original socket.
if fd is not None:
self.socket.close()
self.socket = real_sock
self.server_address = self.socket.getsockname()
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
# If we are on Python 2 the return value from socket.fromfd
# is an internal socket object but what we need for ssl wrap
# is the wrapper around it :(
sock = self.socket
if PY2 and not isinstance(sock, socket.socket):
sock = socket.socket(sock.family, sock.type, sock.proto, sock)
self.socket = ssl_context.wrap_socket(sock, server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
if fd is not None:
real_sock = socket.fromfd(fd, self.address_family,
socket.SOCK_STREAM)
port = 0
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
self.host = host
self.port = self.socket.getsockname()[1]
# Patch in the original socket.
if fd is not None:
self.socket.close()
self.socket = real_sock
self.server_address = self.socket.getsockname()
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
# If we are on Python 2 the return value from socket.fromfd
# is an internal socket object but what we need for ssl wrap
# is the wrapper around it :(
sock = self.socket
if PY2 and not isinstance(sock, socket.socket):
sock = socket.socket(sock.family, sock.type, sock.proto, sock)
self.socket = ssl_context.wrap_socket(sock, server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
if fd is not None:
real_sock = socket.fromfd(fd, self.address_family,
socket.SOCK_STREAM)
port = 0
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
self.host = host
self.port = self.socket.getsockname()[1]
# Patch in the original socket.
if fd is not None:
self.socket.close()
self.socket = real_sock
self.server_address = self.socket.getsockname()
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
# If we are on Python 2 the return value from socket.fromfd
# is an internal socket object but what we need for ssl wrap
# is the wrapper around it :(
sock = self.socket
if PY2 and not isinstance(sock, socket.socket):
sock = socket.socket(sock.family, sock.type, sock.proto, sock)
self.socket = ssl_context.wrap_socket(sock, server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def fromfd(fd, family, type_, proto=0):
s = socket.fromfd(fd, family, type_, proto)
if s.__class__ is not socket.socket:
s = socket.socket(_sock=s)
return s
def rebuild_socket(reduced_handle, family, type_, proto):
fd = rebuild_handle(reduced_handle)
_sock = fromfd(fd, family, type_, proto)
close(fd)
return _sock
def testFromFd(self):
# Testing fromfd()
fd = self.cli_conn.fileno()
sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(sock.close)
self.assertIsInstance(sock, socket.socket)
msg = sock.recv(1024)
self.assertEqual(msg, MSG)
def __init__(self, address, conf, log, fd=None):
self.log = log
self.conf = conf
self.cfg_addr = address
if fd is None:
sock = socket.socket(self.FAMILY, socket.SOCK_STREAM)
else:
sock = socket.fromfd(fd, self.FAMILY, socket.SOCK_STREAM)
self.sock = self.set_options(sock, bound=(fd is not None))
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
if fd is not None:
real_sock = socket.fromfd(fd, self.address_family,
socket.SOCK_STREAM)
port = 0
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
self.host = host
self.port = port
# Patch in the original socket.
if fd is not None:
self.socket.close()
self.socket = real_sock
self.server_address = self.socket.getsockname()
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
# If we are on Python 2 the return value from socket.fromfd
# is an internal socket object but what we need for ssl wrap
# is the wrapper around it :(
sock = self.socket
if PY2 and not isinstance(sock, socket.socket):
sock = socket.socket(sock.family, sock.type, sock.proto, sock)
self.socket = ssl_context.wrap_socket(sock, server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, address, conf, log, fd=None):
self.log = log
self.conf = conf
self.cfg_addr = address
if fd is None:
sock = socket.socket(self.FAMILY, socket.SOCK_STREAM)
bound = False
else:
sock = socket.fromfd(fd, self.FAMILY, socket.SOCK_STREAM)
os.close(fd)
bound = True
self.sock = self.set_options(sock, bound=bound)
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
if fd is not None:
real_sock = socket.fromfd(fd, self.address_family,
socket.SOCK_STREAM)
port = 0
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
self.host = host
self.port = self.socket.getsockname()[1]
# Patch in the original socket.
if fd is not None:
self.socket.close()
self.socket = real_sock
self.server_address = self.socket.getsockname()
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
# If we are on Python 2 the return value from socket.fromfd
# is an internal socket object but what we need for ssl wrap
# is the wrapper around it :(
sock = self.socket
if PY2 and not isinstance(sock, socket.socket):
sock = socket.socket(sock.family, sock.type, sock.proto, sock)
self.socket = ssl_context.wrap_socket(sock, server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def __init__(self, host, port, app, handler=None,
passthrough_errors=False, ssl_context=None, fd=None):
if handler is None:
handler = WSGIRequestHandler
self.address_family = select_ip_version(host, port)
if fd is not None:
real_sock = socket.fromfd(fd, self.address_family,
socket.SOCK_STREAM)
port = 0
HTTPServer.__init__(self, (host, int(port)), handler)
self.app = app
self.passthrough_errors = passthrough_errors
self.shutdown_signal = False
self.host = host
self.port = port
# Patch in the original socket.
if fd is not None:
self.socket.close()
self.socket = real_sock
self.server_address = self.socket.getsockname()
if ssl_context is not None:
if isinstance(ssl_context, tuple):
ssl_context = load_ssl_context(*ssl_context)
if ssl_context == 'adhoc':
ssl_context = generate_adhoc_ssl_context()
# If we are on Python 2 the return value from socket.fromfd
# is an internal socket object but what we need for ssl wrap
# is the wrapper around it :(
sock = self.socket
if PY2 and not isinstance(sock, socket.socket):
sock = socket.socket(sock.family, sock.type, sock.proto, sock)
self.socket = ssl_context.wrap_socket(sock, server_side=True)
self.ssl_context = ssl_context
else:
self.ssl_context = None
def test_connection_fromfd(self):
with contextlib.closing(socket.socket()) as sock:
sock.bind(('localhost', 0))
sock.listen(1)
p = psutil.Process()
for conn in p.connections():
if conn.fd == sock.fileno():
break
else:
self.fail("couldn't find socket fd")
dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
with contextlib.closing(dupsock):
self.assertEqual(dupsock.getsockname(), conn.laddr)
self.assertNotEqual(sock.fileno(), dupsock.fileno())