def test_lost_connection_cancels_timeout(self):
"""
When the connection is lost, the timeout should be cancelled and have
no effect.
"""
timeout = 5
clock = Clock()
protocol = make_protocol(timeout=timeout, reactor=clock)
protocol.connectionMade()
assert not protocol.transport.disconnecting
# Lose the connection
protocol.connectionLost()
# Advance to the timeout
clock.advance(timeout)
# Timeout should *not* be triggered
assert not protocol.transport.disconnecting
python类Clock()的实例源码
def testDelayedStart(self):
timings = [0.05, 0.1, 0.1]
clock = task.Clock()
L = []
lc = TestableLoopingCall(clock, L.append, None)
d = lc.start(0.1, now=False)
theResult = []
def saveResult(result):
theResult.append(result)
d.addCallback(saveResult)
clock.pump(timings)
self.assertEquals(len(L), 2,
"got %d iterations, not 2" % (len(L),))
lc.stop()
self.assertIdentical(theResult[0], lc)
self.failIf(clock.calls)
def testWaitDeferred(self):
# Tests if the callable isn't scheduled again before the returned
# deferred has fired.
timings = [0.2, 0.8]
clock = task.Clock()
def foo():
d = defer.Deferred()
d.addCallback(lambda _: lc.stop())
clock.callLater(1, d.callback, None)
return d
lc = TestableLoopingCall(clock, foo)
d = lc.start(0.2)
clock.pump(timings)
self.failIf(clock.calls)
def testFailurePropagation(self):
# Tests if the failure of the errback of the deferred returned by the
# callable is propagated to the lc errback.
#
# To make sure this test does not hang trial when LoopingCall does not
# wait for the callable's deferred, it also checks there are no
# calls in the clock's callLater queue.
timings = [0.3]
clock = task.Clock()
def foo():
d = defer.Deferred()
clock.callLater(0.3, d.errback, TestException())
return d
lc = TestableLoopingCall(clock, foo)
d = lc.start(1)
self.assertFailure(d, TestException)
clock.pump(timings)
self.failIf(clock.calls)
return d
def testPausing(self):
"""
Test pause inside data receiving. It uses fake clock to see if
pausing/resuming work.
"""
for packet_size in range(1, 10):
t = StringIOWithoutClosing()
clock = task.Clock()
a = LineTester(clock)
a.makeConnection(protocol.FileWrapper(t))
for i in range(len(self.pause_buf)/packet_size + 1):
s = self.pause_buf[i*packet_size:(i+1)*packet_size]
a.dataReceived(s)
self.failUnlessEqual(self.pause_output1, a.received)
clock.advance(0)
self.failUnlessEqual(self.pause_output2, a.received)
def test_manual_removal(self):
cache = BindCache()
clock = task.Clock()
cache.callLater = clock.callLater
cache.add_to_cache(DN, APP, PASSWORD)
cache.add_to_cache(DN_OTHER, APP_OTHER, PASSWORD_OTHER)
self.assertTrue(cache.is_cached(DN, APP, PASSWORD))
self.assertTrue(cache.is_cached(DN_OTHER, APP_OTHER, PASSWORD_OTHER))
# Remove (DN, APP, PASSWORD)
cache.remove_from_cache(DN, APP, PASSWORD)
self.assertFalse(cache.is_cached(DN, APP, PASSWORD))
self.assertTrue(cache.is_cached(DN_OTHER, APP_OTHER, PASSWORD_OTHER))
# Remove (DN_OTHER, APP_OTHER, PASSWORD_OTHER)
cache.remove_from_cache(DN_OTHER, APP_OTHER, PASSWORD_OTHER)
self.assertFalse(cache.is_cached(DN, APP, PASSWORD))
self.assertFalse(cache.is_cached(DN_OTHER, APP_OTHER, PASSWORD_OTHER))
# Remove (DN_OTHER, PASSWORD_OTHER) again
cache.remove_from_cache(DN_OTHER, APP_OTHER, PASSWORD_OTHER)
self.assertFalse(cache.is_cached(DN, APP, PASSWORD))
self.assertFalse(cache.is_cached(DN_OTHER, APP_OTHER, PASSWORD_OTHER))
def test_manual_removal(self):
cache = AppCache(TIMEOUT)
clock = task.Clock()
cache.callLater = clock.callLater
cache.add_to_cache(DN1, MARKER1)
cache.add_to_cache(DN2, MARKER2)
self.assertEqual(cache.get_cached_marker(DN1), MARKER1)
self.assertEqual(cache.get_cached_marker(DN2), MARKER2)
# Remove (DN2, MARKER2)
cache.remove_from_cache(DN2, MARKER2)
self.assertEqual(cache.get_cached_marker(DN1), MARKER1)
self.assertEqual(cache.get_cached_marker(DN2), None)
# Overwrite (DN1, MARKER1) with MARKER2
cache.add_to_cache(DN1, MARKER2)
self.assertEqual(cache.get_cached_marker(DN1), MARKER2)
# Remove (DN1, MARKER1) -- no effect.
cache.remove_from_cache(DN1, MARKER1)
self.assertEqual(cache.get_cached_marker(DN1), MARKER2)
# Remove (DN1_OTHERCASE, MAKER2) -- no effect.
cache.remove_from_cache(DN1_OTHERCASE, MARKER2)
self.assertEqual(cache.get_cached_marker(DN1), MARKER2)
# Remove (DN1, MARKER2) -- removed!
cache.remove_from_cache(DN1, MARKER2)
self.assertEqual(cache.get_cached_marker(DN1), None)
self.assertEqual(cache.get_cached_marker(DN2), None)
def testDelayedStart(self):
timings = [0.05, 0.1, 0.1]
clock = task.Clock()
L = []
lc = TestableLoopingCall(clock, L.append, None)
d = lc.start(0.1, now=False)
theResult = []
def saveResult(result):
theResult.append(result)
d.addCallback(saveResult)
clock.pump(timings)
self.assertEquals(len(L), 2,
"got %d iterations, not 2" % (len(L),))
lc.stop()
self.assertIdentical(theResult[0], lc)
self.failIf(clock.calls)
def testWaitDeferred(self):
# Tests if the callable isn't scheduled again before the returned
# deferred has fired.
timings = [0.2, 0.8]
clock = task.Clock()
def foo():
d = defer.Deferred()
d.addCallback(lambda _: lc.stop())
clock.callLater(1, d.callback, None)
return d
lc = TestableLoopingCall(clock, foo)
d = lc.start(0.2)
clock.pump(timings)
self.failIf(clock.calls)
def testFailurePropagation(self):
# Tests if the failure of the errback of the deferred returned by the
# callable is propagated to the lc errback.
#
# To make sure this test does not hang trial when LoopingCall does not
# wait for the callable's deferred, it also checks there are no
# calls in the clock's callLater queue.
timings = [0.3]
clock = task.Clock()
def foo():
d = defer.Deferred()
clock.callLater(0.3, d.errback, TestException())
return d
lc = TestableLoopingCall(clock, foo)
d = lc.start(1)
self.assertFailure(d, TestException)
clock.pump(timings)
self.failIf(clock.calls)
return d
def testPausing(self):
"""
Test pause inside data receiving. It uses fake clock to see if
pausing/resuming work.
"""
for packet_size in range(1, 10):
t = StringIOWithoutClosing()
clock = task.Clock()
a = LineTester(clock)
a.makeConnection(protocol.FileWrapper(t))
for i in range(len(self.pause_buf)/packet_size + 1):
s = self.pause_buf[i*packet_size:(i+1)*packet_size]
a.dataReceived(s)
self.failUnlessEqual(self.pause_output1, a.received)
clock.advance(0)
self.failUnlessEqual(self.pause_output2, a.received)
def replace_loop(new_loop):
"""
This is a context-manager that sets the txaio event-loop to the
one supplied temporarily. It's up to you to ensure you pass an
event_loop or a reactor instance depending upon asyncio/Twisted.
Use like so:
.. sourcecode:: python
from twisted.internet import task
with replace_loop(task.Clock()) as fake_reactor:
f = txaio.call_later(5, foo)
fake_reactor.advance(10)
# ...etc
"""
# setup
orig = txaio.config.loop
txaio.config.loop = new_loop
yield new_loop
# cleanup
txaio.config.loop = orig
def test_call_later_tx(framework_tx):
'''
Wait for two Futures.
'''
from twisted.internet.task import Clock
new_loop = Clock()
calls = []
with replace_loop(new_loop) as fake_loop:
def foo(*args, **kw):
calls.append((args, kw))
delay = txaio.call_later(1, foo, 5, 6, 7, foo="bar")
assert len(calls) == 0
assert hasattr(delay, 'cancel')
fake_loop.advance(2)
assert len(calls) == 1
assert calls[0][0] == (5, 6, 7)
assert calls[0][1] == dict(foo="bar")
def test_client_make_real_stream():
alice = env.clients[0]
alice.transport.written = []
alice.reactor = task.Clock()
alice.make_real_stream()
assert len(alice.transport.written) == 1
alice.reactor.advance(100)
assert len(alice.transport.written) == 2
calls = alice.reactor.getDelayedCalls()
for c in calls:
assert c.func == alice.make_real_stream
# Check whether a real message is sent if buffer non-empty
receiver = env.pubs_clients[1]
path = [alice.provider] + env.pubs_mixes + [receiver.provider]
test_packet = 'ABCDefgHIJKlmnOPRstUWxyz'
alice.output_buffer.put(test_packet)
alice.make_real_stream()
packet, addr = alice.transport.written[-1]
assert petlib.pack.decode(packet) == 'ABCDefgHIJKlmnOPRstUWxyz'
assert addr == (alice.provider.host, alice.provider.port)
def test_clientConnectionFailed(self):
"""
When a client connection fails, the service removes its reference
to the protocol and tries again after a timeout.
"""
clock = Clock()
cq, service = self.makeReconnector(fireImmediately=False,
clock=clock)
self.assertEqual(len(cq.connectQueue), 1)
cq.connectQueue[0].errback(Failure(Exception()))
whenConnected = service.whenConnected()
self.assertNoResult(whenConnected)
# Don't fail during test tear-down when service shutdown causes all
# waiting connections to fail.
whenConnected.addErrback(lambda ignored: ignored.trap(CancelledError))
clock.advance(AT_LEAST_ONE_ATTEMPT)
self.assertEqual(len(cq.connectQueue), 2)
def test_clientConnectionLost(self):
"""
When a client connection is lost, the service removes its reference
to the protocol and calls retry.
"""
clock = Clock()
cq, service = self.makeReconnector(clock=clock, fireImmediately=False)
self.assertEqual(len(cq.connectQueue), 1)
cq.connectQueue[0].callback(None)
self.assertEqual(len(cq.connectQueue), 1)
self.assertIdentical(self.successResultOf(service.whenConnected()),
cq.applicationProtocols[0])
cq.constructedProtocols[0].connectionLost(Failure(Exception()))
clock.advance(AT_LEAST_ONE_ATTEMPT)
self.assertEqual(len(cq.connectQueue), 2)
cq.connectQueue[1].callback(None)
self.assertIdentical(self.successResultOf(service.whenConnected()),
cq.applicationProtocols[1])
def test_whenConnectedLater(self):
"""
L{ClientService.whenConnected} returns a L{Deferred} that fires when a
connection is established.
"""
clock = Clock()
cq, service = self.makeReconnector(fireImmediately=False, clock=clock)
a = service.whenConnected()
b = service.whenConnected()
self.assertNoResult(a)
self.assertNoResult(b)
cq.connectQueue[0].callback(None)
resultA = self.successResultOf(a)
resultB = self.successResultOf(b)
self.assertIdentical(resultA, resultB)
self.assertIdentical(resultA, cq.applicationProtocols[0])
def test_whenConnectedErrbacksOnStopService(self):
"""
L{ClientService.whenConnected} returns a L{Deferred} that
errbacks with L{CancelledError} if
L{ClientService.stopService} is called between connection
attempts.
"""
clock = Clock()
cq, service = self.makeReconnector(fireImmediately=False,
clock=clock)
beforeErrbackAndStop = service.whenConnected()
# The protocol fails to connect, and the service is waiting to
# reconnect.
cq.connectQueue[0].errback(Exception("no connection"))
service.stopService()
afterErrbackAndStop = service.whenConnected()
self.assertIsInstance(self.failureResultOf(beforeErrbackAndStop).value,
CancelledError)
self.assertIsInstance(self.failureResultOf(afterErrbackAndStop).value,
CancelledError)
def test_stopServiceWhileDisconnecting(self):
"""
Calling L{ClientService.stopService} twice after it has
connected (that is, stopping it while it is disconnecting)
returns a L{Deferred} each time that fires when the
disconnection has completed.
"""
clock = Clock()
cq, service = self.makeReconnector(fireImmediately=False,
clock=clock)
# The protocol connects
cq.connectQueue[0].callback(None)
# The protocol begins disconnecting
firstStopDeferred = service.stopService()
# The protocol continues disconnecting
secondStopDeferred = service.stopService()
# The protocol is disconnected
cq.constructedProtocols[0].connectionLost(Failure(IndentationError()))
self.successResultOf(firstStopDeferred)
self.successResultOf(secondStopDeferred)
def test_stopServiceWhileRestarting(self):
"""
Calling L{ClientService.stopService} after calling a
reconnection attempt returns a L{Deferred} that fires when the
disconnection has completed.
"""
clock = Clock()
cq, service = self.makeReconnector(fireImmediately=False,
clock=clock)
# The protocol connects
cq.connectQueue[0].callback(None)
# The protocol begins disconnecting
firstStopDeferred = service.stopService()
# The protocol begins reconnecting
service.startService()
# The protocol begins disconnecting again
secondStopDeferred = service.stopService()
# The protocol is disconnected
cq.constructedProtocols[0].connectionLost(Failure(IndentationError()))
self.successResultOf(firstStopDeferred)
self.successResultOf(secondStopDeferred)
def test_printProgressBarNoProgress(self):
"""
L{StdioClient._printProgressBar} prints a progress description that
indicates 0 bytes transferred if no bytes have been transferred and no
time has passed.
"""
self.setKnownConsoleSize(10, 34)
clock = self.client.reactor = Clock()
wrapped = BytesIO(b"x")
wrapped.name = b"sample"
wrapper = cftp.FileWrapper(wrapped)
startTime = clock.seconds()
self.client._printProgressBar(wrapper, startTime)
if _PY3:
result = b"\rb'sample' 0% 0.0B 0.0Bps 00:00 "
else:
result = "\rsample 0% 0.0B 0.0Bps 00:00 "
self.assertEqual(self.client.transport.value(), result)
def test_failedPasswordAuthentication(self):
"""
When provided with invalid authentication details, the server should
respond by sending a MSG_USERAUTH_FAILURE message which states whether
the authentication was partially successful, and provides other, open
options for authentication.
See RFC 4252, Section 5.1.
"""
# packet = username, next_service, authentication type, FALSE, password
packet = b''.join([NS(b'foo'), NS(b'none'), NS(b'password'), chr(0),
NS(b'bar')])
self.authServer.clock = task.Clock()
d = self.authServer.ssh_USERAUTH_REQUEST(packet)
self.assertEqual(self.authServer.transport.packets, [])
self.authServer.clock.advance(2)
return d.addCallback(self._checkFailed)
def test_loginTimeout(self):
"""
Test that the login times out.
"""
timeoutAuthServer = userauth.SSHUserAuthServer()
timeoutAuthServer.clock = task.Clock()
timeoutAuthServer.transport = FakeTransport(self.portal)
timeoutAuthServer.serviceStarted()
timeoutAuthServer.clock.advance(11 * 60 * 60)
timeoutAuthServer.serviceStopped()
self.assertEqual(timeoutAuthServer.transport.packets,
[(transport.MSG_DISCONNECT,
b'\x00' * 3 +
chr(transport.DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLE) +
NS(b"you took too long") + NS(b''))])
self.assertTrue(timeoutAuthServer.transport.lostConnection)
def test_requestBodyTimeout(self):
"""
L{HTTPChannel} resets its timeout whenever data from a request body is
delivered to it.
"""
clock = Clock()
transport = StringTransport()
protocol = http.HTTPChannel()
protocol.timeOut = 100
protocol.callLater = clock.callLater
protocol.makeConnection(transport)
protocol.dataReceived(b'POST / HTTP/1.0\r\nContent-Length: 2\r\n\r\n')
clock.advance(99)
self.assertFalse(transport.disconnecting)
protocol.dataReceived(b'x')
clock.advance(99)
self.assertFalse(transport.disconnecting)
protocol.dataReceived(b'x')
self.assertEqual(len(protocol.requests), 1)
def test_requestBodyDefaultTimeout(self):
"""
L{HTTPChannel}'s default timeout is 60 seconds.
"""
clock = Clock()
transport = StringTransport()
factory = http.HTTPFactory()
protocol = factory.buildProtocol(None)
# This is a terrible violation of the abstraction later of
# _genericHTTPChannelProtocol, but we need to do it because
# policies.TimeoutMixin doesn't accept a reactor on the object.
# See https://twistedmatrix.com/trac/ticket/8488
protocol._channel.callLater = clock.callLater
protocol.makeConnection(transport)
protocol.dataReceived(b'POST / HTTP/1.0\r\nContent-Length: 2\r\n\r\n')
clock.advance(59)
self.assertFalse(transport.disconnecting)
clock.advance(1)
self.assertTrue(transport.disconnecting)
def test_requestBodyTimeoutFromFactory(self):
"""
L{HTTPChannel} timeouts whenever data from a request body is not
delivered to it in time, even when it gets built from a L{HTTPFactory}.
"""
clock = Clock()
factory = http.HTTPFactory(timeout=100, reactor=clock)
factory.startFactory()
protocol = factory.buildProtocol(None)
transport = StringTransport()
# Confirm that the timeout is what we think it is.
self.assertEqual(protocol.timeOut, 100)
# This is a terrible violation of the abstraction later of
# _genericHTTPChannelProtocol, but we need to do it because
# policies.TimeoutMixin doesn't accept a reactor on the object.
# See https://twistedmatrix.com/trac/ticket/8488
protocol._channel.callLater = clock.callLater
protocol.makeConnection(transport)
protocol.dataReceived(b'POST / HTTP/1.0\r\nContent-Length: 2\r\n\r\n')
clock.advance(99)
self.assertFalse(transport.disconnecting)
clock.advance(2)
self.assertTrue(transport.disconnecting)
def test_HTTPChannelUnregistersSelfWhenTimingOut(self):
"""
L{HTTPChannel} unregisters itself when it times out a connection.
"""
clock = Clock()
transport = StringTransport()
channel = http.HTTPChannel()
# Patch the channel's callLater method.
channel.timeOut = 100
channel.callLater = clock.callLater
channel.makeConnection(transport)
# Tick the clock forward almost to the timeout.
clock.advance(99)
self.assertIs(transport.producer, channel)
self.assertIs(transport.streaming, True)
# Fire the timeout.
clock.advance(1)
self.assertIs(transport.producer, None)
self.assertIs(transport.streaming, None)
def test_dontRetryIfRetryAutomaticallyFalse(self):
"""
If L{HTTPConnectionPool.retryAutomatically} is set to C{False}, don't
wrap connections with retrying logic.
"""
pool = client.HTTPConnectionPool(Clock())
pool.retryAutomatically = False
# Add a connection to the cache:
protocol = StubHTTPProtocol()
protocol.makeConnection(StringTransport())
pool._putConnection(123, protocol)
# Retrieve it, it should come back unwrapped:
d = pool.getConnection(123, DummyEndpoint())
def gotConnection(connection):
self.assertIdentical(connection, protocol)
return d.addCallback(gotConnection)
def _xforwardedforTest(self, header):
"""
Assert that a request with the given value in its I{X-Forwarded-For}
header is logged by L{proxiedLogFormatter} the same way it would have
been logged by L{combinedLogFormatter} but with 172.16.1.2 as the
client address instead of the normal value.
@param header: An I{X-Forwarded-For} header with left-most address of
172.16.1.2.
"""
reactor = Clock()
reactor.advance(1234567890)
timestamp = http.datetimeToLogString(reactor.seconds())
request = DummyRequestForLogTest(http.HTTPFactory(reactor=reactor))
expected = http.combinedLogFormatter(timestamp, request).replace(
u"1.2.3.4", u"172.16.1.2")
request.requestHeaders.setRawHeaders(b"x-forwarded-for", [header])
line = http.proxiedLogFormatter(timestamp, request)
self.assertEqual(expected, line)
def test_initiallySchedulesOneDataCall(self):
"""
When a H2Connection is established it schedules one call to be run as
soon as the reactor has time.
"""
reactor = task.Clock()
a = H2Connection(reactor)
calls = reactor.getDelayedCalls()
self.assertEqual(len(calls), 1)
call = calls[0]
# Validate that the call is scheduled for right now, but hasn't run,
# and that it's correct.
self.assertTrue(call.active())
self.assertEqual(call.time, 0)
self.assertEqual(call.func, a._sendPrioritisedData)
self.assertEqual(call.args, ())
self.assertEqual(call.kw, {})