def _threadpoolTest(self, method):
# This is a schizophrenic test: it seems to be trying to test
# both the dispatch() behavior of the ThreadPool as well as
# the serialization behavior of threadable.synchronize(). It
# would probably make more sense as two much simpler tests.
N = 10
tp = threadpool.ThreadPool()
tp.start()
try:
waiting = threading.Lock()
waiting.acquire()
actor = Synchronization(N, waiting)
for i in xrange(N):
tp.dispatch(actor, actor.run)
self._waitForLock(waiting)
self.failIf(actor.failures, "run() re-entered %d times" % (actor.failures,))
finally:
tp.stop()
python类ThreadPool()的实例源码
def _threadpoolTest(self, method):
# This is a schizophrenic test: it seems to be trying to test
# both the dispatch() behavior of the ThreadPool as well as
# the serialization behavior of threadable.synchronize(). It
# would probably make more sense as two much simpler tests.
N = 10
tp = threadpool.ThreadPool()
tp.start()
try:
waiting = threading.Lock()
waiting.acquire()
actor = Synchronization(N, waiting)
for i in xrange(N):
tp.dispatch(actor, actor.run)
self._waitForLock(waiting)
self.failIf(actor.failures, "run() re-entered %d times" % (actor.failures,))
finally:
tp.stop()
def setupService(self):
#self.log(log.info, u'Setting up')
self.settings = self.parent.settings
# Configure metrics to be collected each X seconds
metrics_interval = int(self.channel.get('metrics_logger_interval', 60))
self.metrics = Bunch(tx_count=0, starttime=time.time(), interval=metrics_interval)
subscriptions = read_list(self.channel.mqtt_topics)
self.mqtt_service = MqttAdapter(
name = u'mqtt-' + self.channel.realm,
broker_host = self.settings.mqtt.host,
broker_port = int(self.settings.mqtt.port),
broker_username = self.settings.mqtt.username,
broker_password = self.settings.mqtt.password,
callback = self.mqtt_receive,
subscriptions = subscriptions)
self.registerService(self.mqtt_service)
self.influx = InfluxDBAdapter(settings = self.settings.influxdb)
# Perform MQTT message processing using a different thread pool
self.threadpool = ThreadPool()
self.thimble = Thimble(reactor, self.threadpool, self, ["process_message"])
def test_wsgi(self):
"""
The I{--wsgi} option takes the fully-qualifed Python name of a WSGI
application object and creates a L{WSGIResource} at the root which
serves that application.
"""
options = Options()
options.parseOptions(['--wsgi', __name__ + '.application'])
root = options['root']
self.assertTrue(root, WSGIResource)
self.assertIdentical(root._reactor, reactor)
self.assertTrue(isinstance(root._threadpool, ThreadPool))
self.assertIdentical(root._application, application)
# The threadpool should start and stop with the reactor.
self.assertFalse(root._threadpool.started)
reactor.fireSystemEvent('startup')
self.assertTrue(root._threadpool.started)
self.assertFalse(root._threadpool.joined)
reactor.fireSystemEvent('shutdown')
self.assertTrue(root._threadpool.joined)
def test_callInThreadException(self):
"""
L{ThreadPool.callInThread} logs exceptions raised by the callable it
is passed.
"""
class NewError(Exception):
pass
def raiseError():
raise NewError()
tp = threadpool.ThreadPool(0, 1)
tp.callInThread(raiseError)
tp.start()
tp.stop()
errors = self.flushLoggedErrors(NewError)
self.assertEqual(len(errors), 1)
def test_callbackThread(self):
"""
L{ThreadPool.callInThreadWithCallback} calls the function it is
given and the C{onResult} callback in the same thread.
"""
threadIds = []
event = threading.Event()
def onResult(success, result):
threadIds.append(threading.currentThread().ident)
event.set()
def func():
threadIds.append(threading.currentThread().ident)
tp = threadpool.ThreadPool(0, 1)
tp.callInThreadWithCallback(onResult, func)
tp.start()
self.addCleanup(tp.stop)
event.wait(self.getTimeout())
self.assertEqual(len(threadIds), 2)
self.assertEqual(threadIds[0], threadIds[1])
def test_existingWork(self):
"""
Work added to the threadpool before its start should be executed once
the threadpool is started: this is ensured by trying to release a lock
previously acquired.
"""
waiter = threading.Lock()
waiter.acquire()
tp = threadpool.ThreadPool(0, 1)
tp.callInThread(waiter.release) # before start()
tp.start()
try:
self._waitForLock(waiter)
finally:
tp.stop()
def __init__(self, coordinator, failTest, newWorker, *args, **kwargs):
"""
Initialize this L{MemoryPool} with a test case.
@param coordinator: a worker used to coordinate work in the L{Team}
underlying this threadpool.
@type coordinator: L{twisted._threads.IExclusiveWorker}
@param failTest: A 1-argument callable taking an exception and raising
a test-failure exception.
@type failTest: 1-argument callable taking (L{Failure}) and raising
L{unittest.FailTest}.
@param newWorker: a 0-argument callable that produces a new
L{twisted._threads.IWorker} provider on each invocation.
@type newWorker: 0-argument callable returning
L{twisted._threads.IWorker}.
"""
self._coordinator = coordinator
self._failTest = failTest
self._newWorker = newWorker
threadpool.ThreadPool.__init__(self, *args, **kwargs)
def __init__(self, testCase, *args, **kwargs):
"""
Create a L{PoolHelper}.
@param testCase: a test case attached to this helper.
@type args: The arguments passed to a L{threadpool.ThreadPool}.
@type kwargs: The arguments passed to a L{threadpool.ThreadPool}
"""
coordinator, self.performCoordination = createMemoryWorker()
self.workers = []
def newWorker():
self.workers.append(createMemoryWorker())
return self.workers[-1][0]
self.threadpool = MemoryPool(coordinator, testCase.fail, newWorker,
*args, **kwargs)
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
if not reactor.running:
reactor.run()
def _initThreadPool(self):
from twisted.python import threadpool
self.threadpool = threadpool.ThreadPool(0, 10, 'twisted.internet.reactor')
self.callWhenRunning(self.threadpool.start)
self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
def testPersistence(self):
tp = threadpool.ThreadPool(7, 20)
tp.start()
# XXX Sigh - race condition: start should return a Deferred
# which fires when all the workers it started have fully
# started up.
time.sleep(0.1)
self.assertEquals(len(tp.threads), 7)
self.assertEquals(tp.min, 7)
self.assertEquals(tp.max, 20)
# check that unpickled threadpool has same number of threads
s = pickle.dumps(tp)
tp2 = pickle.loads(s)
tp2.start()
# XXX As above
time.sleep(0.1)
self.assertEquals(len(tp2.threads), 7)
self.assertEquals(tp2.min, 7)
self.assertEquals(tp2.max, 20)
tp.stop()
tp2.stop()
def testExistingWork(self):
waiter = threading.Lock()
waiter.acquire()
tp = threadpool.ThreadPool(0, 1)
tp.callInThread(waiter.release) # before start()
tp.start()
try:
self._waitForLock(waiter)
finally:
tp.stop()
def setUp(self):
self.event = threading.Event()
self.threadpool = threadpool.ThreadPool(0, 10)
self.threadpool.start()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
if not reactor.running:
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
if not reactor.running:
reactor.run()
def twisted(app, address, **options):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, app))
reactor.listenTCP(address[1], factory, interface=address[0])
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
if not reactor.running:
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
if not reactor.running:
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
if not reactor.running:
reactor.run()
def twisted(app, address, **options):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, app))
reactor.listenTCP(address[1], factory, interface=address[0])
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
reactor.run()
def twisted(app, address, **options):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, app))
reactor.listenTCP(address[1], factory, interface=address[0])
reactor.run()
def run(self, handler):
from twisted.web import server, wsgi
from twisted.python.threadpool import ThreadPool
from twisted.internet import reactor
thread_pool = ThreadPool()
thread_pool.start()
reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop)
factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler))
reactor.listenTCP(self.port, factory, interface=self.host)
reactor.run()