def assertReading(testCase, reactor, transport):
"""
Use the given test to assert that the given transport is actively reading
in the given reactor.
@note: Maintainers; for more information on why this is a function rather
than a method on a test case, see U{this document on how we structure
test tools
<http://twistedmatrix.com/trac/wiki/Design/KeepTestToolsOutOfFixtures>}
@param testCase: a test case to perform the assertion upon.
@type testCase: L{TestCase}
@param reactor: A reactor, possibly one providing L{IReactorFDSet}, or an
IOCP reactor.
@param transport: An L{ITCPTransport}
"""
if IReactorFDSet.providedBy(reactor):
testCase.assertIn(transport, reactor.getReaders())
else:
# IOCP.
testCase.assertIn(transport, reactor.handles)
testCase.assertTrue(transport.reading)
评论列表
文章目录