def _stop_reactor(self, _=None):
reactor.stop()
python类stop()的实例源码
def doItTkinterly(opt):
root=Tkinter.Tk()
root.withdraw()
root.title('One Moment.')
root.protocol('WM_DELETE_WINDOW', reactor.stop)
tksupport.install(root)
prog=ProgressBar(root, value=0, labelColor="black", width=200)
prog.pack()
# callback immediately
d=defer.succeed(root).addErrback(log.err)
def deiconify(root):
root.deiconify()
return root
d.addCallback(deiconify)
if opt['zipfile']:
uz=Progressor('Unpacking documentation...')
max=zipstream.countZipFileChunks(opt['zipfile'], 4096)
uz.setBar(prog, max)
uz.setIterator(zipstream.unzipIterChunky(opt['zipfile'],
opt['ziptargetdir']))
d.addCallback(uz.processAll)
if opt['compiledir']:
comp=Progressor('Compiling to pyc...')
comp.setBar(prog, countPysRecursive(opt['compiledir']))
comp.setIterator(compiler(opt['compiledir']))
d.addCallback(comp.processAll)
def stop(ignore):
reactor.stop()
root.destroy()
d.addCallback(stop)
reactor.run()
def handle_QUIT(self):
ColoredManhole.handle_QUIT(self)
reactor.stop()
def deferredAskFrame(question, echo):
if frame.callback:
raise "can't ask 2 questions at once!"
d = defer.Deferred()
resp = []
def gotChar(ch, resp=resp):
if not ch: return
if ch=='\x03': # C-c
reactor.stop()
if ch=='\r':
frame.write('\r\n')
stresp = ''.join(resp)
del resp
frame.callback = None
d.callback(stresp)
return
elif 32 <= ord(ch) < 127:
resp.append(ch)
if echo:
frame.write(ch)
elif ord(ch) == 8 and resp: # BS
if echo: frame.write('\x08 \x08')
resp.pop()
frame.callback = gotChar
frame.write(question)
frame.canvas.focus_force()
return d
def handleError():
from twisted.python import failure
global exitStatus
exitStatus = 2
log.err(failure.Failure())
reactor.stop()
raise
def closed(self):
log.msg('closed %s' % self)
if len(self.conn.channels) == 1: # just us left
reactor.stop()
def handleError():
from twisted.python import failure
global exitStatus
exitStatus = 2
try:
reactor.stop()
except: pass
log.err(failure.Failure())
raise
def _ebExit(f):
#global exitStatus
if hasattr(f.value, 'value'):
s = f.value.value
else:
s = str(f)
print s
#exitStatus = "conch: exiting with error %s" % f
try:
reactor.stop()
except: pass
def _stopReactor():
try:
reactor.stop()
except: pass
def _guiUpdate(reactor, delay):
pyui.draw()
if pyui.update() == 0:
pyui.quit()
reactor.stop()
else:
reactor.callLater(delay, _guiUpdate, reactor, delay)
def connectionLost(self, reason):
reactor.stop()
def connectionLost(self, reason):
reactor.stop()
def testIterate(self):
"""
Test that reactor.iterate(0) doesn't block
"""
start = time.time()
# twisted timers are distinct from the underlying event loop's
# timers, so this fail-safe probably won't keep a failure from
# hanging the test
t = reactor.callLater(10, reactor.crash)
reactor.iterate(0) # shouldn't block
stop = time.time()
elapsed = stop - start
#print "elapsed", elapsed
self.failUnless(elapsed < 8)
t.cancel()
def stopProducing(self):
self.events.append('stop')
def test_unconnectedFileDescriptor(self):
"""
Verify that registering a producer when the connection has already
been closed invokes its stopProducing() method.
"""
fd = abstract.FileDescriptor()
fd.disconnected = 1
dp = DummyProducer()
fd.registerProducer(dp, 0)
self.assertEquals(dp.events, ['stop'])
def testShutdownFromDatagramReceived(self):
"""Test reactor shutdown while in a recvfrom() loop"""
# udp.Port's doRead calls recvfrom() in a loop, as an optimization.
# It is important this loop terminate under various conditions.
# Previously, if datagramReceived synchronously invoked
# reactor.stop(), under certain reactors, the Port's socket would
# synchronously disappear, causing an AttributeError inside that
# loop. This was mishandled, causing the loop to spin forever.
# This test is primarily to ensure that the loop never spins
# forever.
finished = defer.Deferred()
pr = self.server.packetReceived = defer.Deferred()
def pktRece(ignored):
# Simulate reactor.stop() behavior :(
self.server.transport.connectionLost()
# Then delay this Deferred chain until the protocol has been
# disconnected, as the reactor should do in an error condition
# such as we are inducing. This is very much a whitebox test.
reactor.callLater(0, finished.callback, None)
pr.addCallback(pktRece)
def flushErrors(ignored):
# We are breaking abstraction and calling private APIs, any
# number of horrible errors might occur. As long as the reactor
# doesn't hang, this test is satisfied. (There may be room for
# another, stricter test.)
self.flushLoggedErrors()
finished.addCallback(flushErrors)
self.server.transport.write('\0' * 64, ('127.0.0.1',
self.server.transport.getHost().port))
return finished