def test_doubleHalfClose(self):
"""
If one side half-closes its connection, and then the other side of the
connection calls C{loseWriteConnection}, and then C{loseConnection} in
{writeConnectionLost}, the connection is closed correctly.
This rather obscure case used to fail (see ticket #3037).
"""
@implementer(IHalfCloseableProtocol)
class ListenerProtocol(ConnectableProtocol):
def readConnectionLost(self):
self.transport.loseWriteConnection()
def writeConnectionLost(self):
self.transport.loseConnection()
class Client(ConnectableProtocol):
def connectionMade(self):
self.transport.loseConnection()
# If test fails, reactor won't stop and we'll hit timeout:
runProtocolsWithReactor(
self, ListenerProtocol(), Client(), TCPCreator())
评论列表
文章目录