def do_cleanThreads(cls):
from twisted.internet import reactor
if interfaces.IReactorThreads.providedBy(reactor):
reactor.suggestThreadPoolSize(0)
if hasattr(reactor, 'threadpool') and reactor.threadpool:
reactor.threadpool.stop()
reactor.threadpool = None
# *Put it back* and *start it up again*. The
# reactor's threadpool is *private*: we cannot just
# rape it and walk away.
reactor.threadpool = threadpool.ThreadPool(0, 10)
reactor.threadpool.start()
python类threadpool()的实例源码
def do_cleanThreads(cls):
from twisted.internet import reactor
if interfaces.IReactorThreads.providedBy(reactor):
reactor.suggestThreadPoolSize(0)
if hasattr(reactor, 'threadpool') and reactor.threadpool:
reactor.threadpool.stop()
reactor.threadpool = None
# *Put it back* and *start it up again*. The
# reactor's threadpool is *private*: we cannot just
# rape it and walk away.
reactor.threadpool = threadpool.ThreadPool(0, 10)
reactor.threadpool.start()
def postClassCleanup(self):
"""
Called by L{unittest.TestCase} after the last test in a C{TestCase}
subclass. Ensures the reactor is clean by murdering the threadpool,
catching any pending
L{DelayedCall<twisted.internet.base.DelayedCall>}s, open sockets etc.
"""
selectables = self._cleanReactor()
calls = self._cleanPending()
if selectables or calls:
aggregate = DirtyReactorAggregateError(calls, selectables)
self.result.addError(self.test, Failure(aggregate))
self._cleanThreads()
def _cleanThreads(self):
reactor = self._getReactor()
if interfaces.IReactorThreads.providedBy(reactor):
if reactor.threadpool is not None:
# Stop the threadpool now so that a new one is created.
# This improves test isolation somewhat (although this is a
# post class cleanup hook, so it's only isolating classes
# from each other, not methods from each other).
reactor._stopThreadPool()
def test__default_pool_is_disconnected_pool(self):
pool = reactor.threadpool
self.assertThat(pool, IsInstance(ThreadPool))
self.assertThat(
pool.context.contextFactory,
Is(orm.TotallyDisconnected))
self.assertThat(pool.min, Equals(0))
def __install():
log = logging.getLogger('tpython')
log.info('setting up twisted reactor in ipython loop')
from twisted.internet import _threadedselect
_threadedselect.install()
from twisted.internet import reactor
from collections import deque
from IPython.lib import inputhook
from IPython import InteractiveShell
q = deque()
def reactor_wake(twisted_loop_next, q=q):
q.append(twisted_loop_next)
def reactor_work(*_args):
if q:
while len(q):
q.popleft()()
return 0
def reactor_start(*_args):
log.info('starting twisted reactor in ipython')
reactor.interleave(reactor_wake) # @UndefinedVariable
inputhook.set_inputhook(reactor_work)
def reactor_stop():
if reactor.threadpool: # @UndefinedVariable
log.info('stopping twisted threads')
reactor.threadpool.stop() # @UndefinedVariable
log.info('shutting down twisted reactor')
reactor._mainLoopShutdown() # @UndefinedVariable
ip = InteractiveShell.instance()
ask_exit = ip.ask_exit
def ipython_exit():
reactor_stop()
return ask_exit()
ip.ask_exit = ipython_exit
reactor_start()
return reactor