def do_cleanReactor(cls):
s = []
from twisted.internet import reactor
removedSelectables = reactor.removeAll()
if removedSelectables:
s.append(DIRTY_REACTOR_MSG)
for sel in removedSelectables:
if interfaces.IProcessTransport.providedBy(sel):
sel.signalProcess('KILL')
s.append(repr(sel))
if s:
raise DirtyReactorError(' '.join(s))
python类removeAll()的实例源码
def do_cleanReactor(cls):
s = []
from twisted.internet import reactor
removedSelectables = reactor.removeAll()
if removedSelectables:
s.append(DIRTY_REACTOR_MSG)
for sel in removedSelectables:
if interfaces.IProcessTransport.providedBy(sel):
sel.signalProcess('KILL')
s.append(repr(sel))
if s:
raise DirtyReactorError(' '.join(s))
def tearDown(self):
del self.channel_layer
reactor.removeAll()
super(TwistedTests, self).tearDown()
def tearDown(self):
reactor.removeAll()
super(RabbitmqChannelLayerTwistedTest, self).tearDown()
def _cleanReactor(self):
"""
Remove all selectables from the reactor, kill any of them that were
processes, and return their string representation.
"""
reactor = self._getReactor()
selectableStrings = []
for sel in reactor.removeAll():
if interfaces.IProcessTransport.providedBy(sel):
sel.signalProcess('KILL')
selectableStrings.append(repr(sel))
return selectableStrings