def test_delayedCallCleanup(self):
"""Checking to make sure Sessions do not leave extra DelayedCalls.
"""
from twisted.internet import reactor
delayedCallsBeforeSession = repr(reactor.getDelayedCalls())
session = self.site.makeSession()
session.touch()
session.expire()
self.failUnlessEqual(delayedCallsBeforeSession,
repr(reactor.getDelayedCalls()))
# Conditional requests:
# If-None-Match, If-Modified-Since
# make conditional request:
# normal response if condition succeeds
# if condition fails:
# response code
# no body
python类getDelayedCalls()的实例源码
def testGetDelayedCalls(self):
if not hasattr(reactor, "getDelayedCalls"):
return
# This is not a race because we don't do anything which might call
# the reactor until we have all the timers set up. If we did, this
# test might fail on slow systems.
self.checkTimers()
self.addTimer(35, self.done)
self.addTimer(20, self.callback)
self.addTimer(30, self.callback)
which = self.counter
self.addTimer(29, self.callback)
self.addTimer(25, self.addCallback)
self.addTimer(26, self.callback)
self.timers[which].cancel()
del self.timers[which]
self.checkTimers()
self.deferred.addCallback(lambda x : self.checkTimers())
return self.deferred
def do_cleanPending(cls):
# don't import reactor when module is loaded
from twisted.internet import reactor
# flush short-range timers
reactor.iterate(0)
reactor.iterate(0)
pending = reactor.getDelayedCalls()
if pending:
s = PENDING_TIMED_CALLS_MSG
for p in pending:
s += " %s\n" % (p,)
if p.active():
p.cancel() # delete the rest
else:
print "WEIRNESS! pending timed call not active+!"
raise PendingTimedCallsError(s)
def test_delayedCallCleanup(self):
"""Checking to make sure Sessions do not leave extra DelayedCalls.
"""
from twisted.internet import reactor
delayedCallsBeforeSession = repr(reactor.getDelayedCalls())
session = self.site.makeSession()
session.touch()
session.expire()
self.failUnlessEqual(delayedCallsBeforeSession,
repr(reactor.getDelayedCalls()))
# Conditional requests:
# If-None-Match, If-Modified-Since
# make conditional request:
# normal response if condition succeeds
# if condition fails:
# response code
# no body
def testGetDelayedCalls(self):
if not hasattr(reactor, "getDelayedCalls"):
return
# This is not a race because we don't do anything which might call
# the reactor until we have all the timers set up. If we did, this
# test might fail on slow systems.
self.checkTimers()
self.addTimer(35, self.done)
self.addTimer(20, self.callback)
self.addTimer(30, self.callback)
which = self.counter
self.addTimer(29, self.callback)
self.addTimer(25, self.addCallback)
self.addTimer(26, self.callback)
self.timers[which].cancel()
del self.timers[which]
self.checkTimers()
self.deferred.addCallback(lambda x : self.checkTimers())
return self.deferred
def do_cleanPending(cls):
# don't import reactor when module is loaded
from twisted.internet import reactor
# flush short-range timers
reactor.iterate(0)
reactor.iterate(0)
pending = reactor.getDelayedCalls()
if pending:
s = PENDING_TIMED_CALLS_MSG
for p in pending:
s += " %s\n" % (p,)
if p.active():
p.cancel() # delete the rest
else:
print "WEIRNESS! pending timed call not active+!"
raise PendingTimedCallsError(s)
def test_initiallySchedulesOneDataCall(self):
"""
When a H2Connection is established it schedules one call to be run as
soon as the reactor has time.
"""
reactor = task.Clock()
a = H2Connection(reactor)
calls = reactor.getDelayedCalls()
self.assertEqual(len(calls), 1)
call = calls[0]
# Validate that the call is scheduled for right now, but hasn't run,
# and that it's correct.
self.assertTrue(call.active())
self.assertEqual(call.time, 0)
self.assertEqual(call.func, a._sendPrioritisedData)
self.assertEqual(call.args, ())
self.assertEqual(call.kw, {})
def checkTimers(self):
l1 = self.timers.values()
l2 = list(reactor.getDelayedCalls())
# There should be at least the calls we put in. There may be other
# calls that are none of our business and that we should ignore,
# though.
missing = []
for dc in l1:
if dc not in l2:
missing.append(dc)
if missing:
self.finished = 1
self.assertFalse(missing, "Should have been missing no calls, instead "
+ "was missing " + repr(missing))
def testGetDelayedCalls(self):
if not hasattr(reactor, "getDelayedCalls"):
return
# This is not a race because we don't do anything which might call
# the reactor until we have all the timers set up. If we did, this
# test might fail on slow systems.
self.checkTimers()
self.addTimer(35, self.done)
self.addTimer(20, self.callback)
self.addTimer(30, self.callback)
which = self.counter
self.addTimer(29, self.callback)
self.addTimer(25, self.addCallback)
self.addTimer(26, self.callback)
self.timers[which].cancel()
del self.timers[which]
self.checkTimers()
self.deferred.addCallback(lambda x : self.checkTimers())
return self.deferred
def _cleanPending(self):
"""
Cancel all pending calls and return their string representations.
"""
reactor = self._getReactor()
# flush short-range timers
reactor.iterate(0)
reactor.iterate(0)
delayedCallStrings = []
for p in reactor.getDelayedCalls():
if p.active():
delayedString = str(p)
p.cancel()
else:
print("WEIRDNESS! pending timed call not active!")
delayedCallStrings.append(delayedString)
return delayedCallStrings
def setUp(self):
self.finished = 0
self.counter = 0
self.timers = {}
self.deferred = defer.Deferred()
# ick. Sometimes there are magic timers already running:
# popsicle.Freezer.tick . Kill off all such timers now so they won't
# interfere with the test. Of course, this kind of requires that
# getDelayedCalls already works, so certain failure modes won't be
# noticed.
if not hasattr(reactor, "getDelayedCalls"):
return
for t in reactor.getDelayedCalls():
t.cancel()
reactor.iterate() # flush timers
def checkTimers(self):
l1 = self.timers.values()
l2 = list(reactor.getDelayedCalls())
# There should be at least the calls we put in. There may be other
# calls that are none of our business and that we should ignore,
# though.
missing = []
for dc in l1:
if dc not in l2:
missing.append(dc)
if missing:
self.finished = 1
self.failIf(missing, "Should have been missing no calls, instead was missing " + repr(missing))
def setUp(self):
self.finished = 0
self.counter = 0
self.timers = {}
self.deferred = defer.Deferred()
# ick. Sometimes there are magic timers already running:
# popsicle.Freezer.tick . Kill off all such timers now so they won't
# interfere with the test. Of course, this kind of requires that
# getDelayedCalls already works, so certain failure modes won't be
# noticed.
if not hasattr(reactor, "getDelayedCalls"):
return
for t in reactor.getDelayedCalls():
t.cancel()
reactor.iterate() # flush timers
def checkTimers(self):
l1 = self.timers.values()
l2 = list(reactor.getDelayedCalls())
# There should be at least the calls we put in. There may be other
# calls that are none of our business and that we should ignore,
# though.
missing = []
for dc in l1:
if dc not in l2:
missing.append(dc)
if missing:
self.finished = 1
self.failIf(missing, "Should have been missing no calls, instead was missing " + repr(missing))
def test_noTimeoutIfConnectionLost(self):
"""
When a L{H2Connection} loses its connection it cancels its timeout.
"""
frameFactory = FrameFactory()
frames = buildRequestFrames(self.getRequestHeaders, [], frameFactory)
initialData = frameFactory.clientConnectionPreface()
initialData += b''.join(f.serialize() for f in frames)
reactor, conn, transport = self.initiateH2Connection(
initialData, requestFactory=DummyProducerHandler,
)
sentData = transport.value()
oldCallCount = len(reactor.getDelayedCalls())
# Now lose the connection.
conn.connectionLost("reason")
# There should be one fewer call than there was.
currentCallCount = len(reactor.getDelayedCalls())
self.assertEqual(oldCallCount - 1, currentCallCount)
# Advancing the clock should do nothing.
reactor.advance(101)
self.assertEqual(transport.value(), sentData)
def process_metrics(self):
metrics = []
# Compute frequency of measurements
if 'packet_time' in self.metrics and self.metrics['packet_time'] is not None:
self.metrics.setdefault('packet_starttime', self.metrics.packet_time)
# Convert nanos to seconds
packet_duration = (self.metrics.packet_time - self.metrics.packet_starttime) / 1000.0 / 1000.0 / 1000.0
packet_duration = packet_duration or self.metrics.starttime
if packet_duration != 0:
packet_frequency = self.metrics.tx_count / float(packet_duration)
else:
packet_frequency = 0.0
metrics.append('measurements: %.02f Hz' % packet_frequency)
# Reset for next round
self.metrics.packet_starttime = self.metrics.packet_time
# Compute frequency of transactions
now = time.time()
transaction_duration = now - self.metrics.starttime
if transaction_duration != 0:
transaction_frequency = self.metrics.tx_count / float(transaction_duration)
else:
transaction_frequency = 0.0
metrics.append('transactions: %.02f tps' % transaction_frequency)
# Reset for next round
self.metrics.tx_count = 0
self.metrics.starttime = now
# Add information from the Twisted reactor
pending_calls = reactor.getDelayedCalls()
pending_count = len(pending_calls)
#metrics.append('pending: %d' % pending_count)
metrics_info = ', '.join(metrics)
log.info('[{realm:12s}] {metrics_info}', realm=self.channel.realm, metrics_info=metrics_info)