def test_same_port_allocation(self):
if 'TRAVIS' in os.environ:
self.skipTest("dual-stack servers often have port conflicts on travis")
sockets = bind_sockets(None, 'localhost')
try:
port = sockets[0].getsockname()[1]
self.assertTrue(all(s.getsockname()[1] == port
for s in sockets[1:]))
finally:
for sock in sockets:
sock.close()
python类close()的实例源码
def tearDown(self):
self.resolver.close()
super(ThreadedResolverTest, self).tearDown()
def test_same_port_allocation(self):
if 'TRAVIS' in os.environ:
self.skipTest("dual-stack servers often have port conflicts on travis")
sockets = bind_sockets(None, 'localhost')
try:
port = sockets[0].getsockname()[1]
self.assertTrue(all(s.getsockname()[1] == port
for s in sockets[1:]))
finally:
for sock in sockets:
sock.close()
def test_reuse_port(self):
socket, port = bind_unused_port(reuse_port=True)
try:
sockets = bind_sockets(port, 'localhost', reuse_port=True)
self.assertTrue(all(s.getsockname()[1] == port for s in sockets))
finally:
socket.close()
for sock in sockets:
sock.close()
def shutdown_request(self, request):
"""Called to shutdown and close an individual request."""
self.close_request(request)
def server_close(self):
"""Called to clean-up the server.
May be overridden.
"""
self.socket.close()
def shutdown_request(self, request):
"""Called to shutdown and close an individual request."""
try:
#explicitly shutdown. socket.close() merely releases
#the socket and waits for GC to perform the actual close.
request.shutdown(socket.SHUT_WR)
except socket.error:
pass #some platforms may raise ENOTCONN here
self.close_request(request)
def close_request(self, request):
"""Called to clean up an individual request."""
request.close()
def close_request(self, request):
# No need to close anything.
pass
def finish(self):
if not self.wfile.closed:
try:
self.wfile.flush()
except socket.error:
# An final socket error may have occurred here, such as
# the local error ECONNABORTED.
pass
self.wfile.close()
self.rfile.close()
def _closeSocket(self):
# socket.close() doesn't *really* close if there's another reference
# to it in the TCP/IP stack, e.g. if it was was inherited by a
# subprocess. And we really do want to close the connection. So we
# use shutdown() instead, and then close() in order to release the
# filedescriptor.
skt = self.socket
try:
getattr(skt, self._socketShutdownMethod)(2)
except socket.error:
pass
try:
skt.close()
except socket.error:
pass
def handle_tcp_https(socket, dstport):
plaintext_socket = switchtossl(socket)
if plaintext_socket:
handle_tcp_http(plaintext_socket, dstport)
else:
socket.close()
def handle_tcp(socket, dstport):
handler = tcp_handlers.get(dstport, handle_tcp_default)
try:
handler(socket, dstport)
except Exception as err:
print(traceback.format_exc())
socket.close()
def broadcast (server_socket, sock, message):
for socket in SOCKET_LIST:
if socket != server_socket and socket != sock :
try :
socket.send(message)
except :
socket.close()
if socket in SOCKET_LIST:
SOCKET_LIST.remove(socket)
def disconnect(self):
self._lib.disconnect()
for socket in self._chan_to_rsocket.values():
socket.close()
for socket in self._chan_to_wsocket.values():
socket.close()
self._chan_to_rsocket.clear()
self._chan_to_wsocket.clear()
self._lib.unregister_sockets()
def shutdown_request(self, request):
"""Called to shutdown and close an individual request."""
self.close_request(request)
def server_close(self):
"""Called to clean-up the server.
May be overridden.
"""
self.socket.close()
def shutdown_request(self, request):
"""Called to shutdown and close an individual request."""
try:
#explicitly shutdown. socket.close() merely releases
#the socket and waits for GC to perform the actual close.
request.shutdown(socket.SHUT_WR)
except socket.error:
pass #some platforms may raise ENOTCONN here
self.close_request(request)
def close_request(self, request):
"""Called to clean up an individual request."""
request.close()
def close_request(self, request):
# No need to close anything.
pass