def deliver_messages(self, timeout=.1):
"""
Allow peers to communicate.
The strategy is as follows:
1. Measure the amount of working threads in the threadpool
2. After 10 milliseconds, check if we are down to 0 twice in a row
3. If not, go back to handling calls (step 2) or return, if the timeout has been reached
:param timeout: the maximum time to wait for messages to be delivered
"""
rtime = 0
probable_exit = False
while (rtime < timeout):
yield self.sleep(.01)
rtime += .01
if len(reactor.getThreadPool().working) == 0:
if probable_exit:
break
probable_exit = True
else:
probable_exit = False
python类getThreadPool()的实例源码
def boot_frontend(config, debug=False):
"""
Boot a Pyramid WSGI application as Twisted component
"""
http_port = int(config.get('config-web', 'http_port'))
websocket_uri = unicode(config.get('wamp', 'listen'))
# https://stackoverflow.com/questions/13122519/serving-pyramid-application-using-twistd/13138610#13138610
config = resource_filename('kotori.frontend', 'development.ini')
application = get_app(config, 'main')
# https://twistedmatrix.com/documents/13.1.0/web/howto/web-in-60/wsgi.html
resource = WSGIResource(reactor, reactor.getThreadPool(), application)
reactor.listenTCP(http_port, Site(resource))
def test_configures_thread_pool(self):
# Patch and restore where it's visible because patching a running
# reactor is potentially fairly harmful.
patcher = monkey.MonkeyPatcher()
patcher.add_patch(reactor, "threadpool", None)
patcher.add_patch(reactor, "threadpoolForDatabase", None)
patcher.patch()
try:
service_maker = RegionServiceMaker("Harry", "Hill")
# Disable _ensureConnection() its not allowed in the reactor.
self.patch_autospec(service_maker, "_ensureConnection")
service_maker.makeService(Options())
threadpool = reactor.getThreadPool()
self.assertThat(threadpool, IsInstance(ThreadPool))
finally:
patcher.restore()
def start(self):
"""Start the Web Server """
self.site = Site(WSGIResource(reactor, reactor.getThreadPool(), self.app))
self.port = reactor.listenTCP(self.server.config.webport, self.site)
def makeService(self, options):
config=valid_config()
s=MultiService()
from crondeamon.slave import service as subrpc
serverfactory = server.Site(subrpc.MainRpc())
slave_service=TCPServer(int(config["slaveport"]),serverfactory,interface=config["host"])
slave_service.setServiceParent(s)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "cap.settings")
from django.core.handlers.wsgi import WSGIHandler
application = WSGIHandler()
resource = WSGIResource(reactor, reactor.getThreadPool(), application)
ui_service=TCPServer(int(config["uiport"]),server.Site(resource),interface=config["host"])
ui_service.setServiceParent(s)
return s
def deferToThread(f, *args, **kwargs):
"""
Run a function in a thread and return the result as a Deferred.
@param f: The function to call.
@param *args: positional arguments to pass to f.
@param **kwargs: keyword arguments to pass to f.
@return: A Deferred which fires a callback with the result of f,
or an errback with a L{twisted.python.failure.Failure} if f throws
an exception.
"""
from twisted.internet import reactor
return deferToThreadPool(reactor, reactor.getThreadPool(),
f, *args, **kwargs)