python类threadpool()的实例源码

util.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def do_cleanThreads(cls):
        from twisted.internet import reactor
        if interfaces.IReactorThreads.providedBy(reactor):
            reactor.suggestThreadPoolSize(0)
            if hasattr(reactor, 'threadpool') and reactor.threadpool:
                reactor.threadpool.stop()
                reactor.threadpool = None
                # *Put it back* and *start it up again*.  The
                # reactor's threadpool is *private*: we cannot just
                # rape it and walk away.
                reactor.threadpool = threadpool.ThreadPool(0, 10)
                reactor.threadpool.start()
util.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def do_cleanThreads(cls):
        from twisted.internet import reactor
        if interfaces.IReactorThreads.providedBy(reactor):
            reactor.suggestThreadPoolSize(0)
            if hasattr(reactor, 'threadpool') and reactor.threadpool:
                reactor.threadpool.stop()
                reactor.threadpool = None
                # *Put it back* and *start it up again*.  The
                # reactor's threadpool is *private*: we cannot just
                # rape it and walk away.
                reactor.threadpool = threadpool.ThreadPool(0, 10)
                reactor.threadpool.start()
util.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def postClassCleanup(self):
        """
        Called by L{unittest.TestCase} after the last test in a C{TestCase}
        subclass. Ensures the reactor is clean by murdering the threadpool,
        catching any pending
        L{DelayedCall<twisted.internet.base.DelayedCall>}s, open sockets etc.
        """
        selectables = self._cleanReactor()
        calls = self._cleanPending()
        if selectables or calls:
            aggregate = DirtyReactorAggregateError(calls, selectables)
            self.result.addError(self.test, Failure(aggregate))
        self._cleanThreads()
util.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _cleanThreads(self):
        reactor = self._getReactor()
        if interfaces.IReactorThreads.providedBy(reactor):
            if reactor.threadpool is not None:
                # Stop the threadpool now so that a new one is created.
                # This improves test isolation somewhat (although this is a
                # post class cleanup hook, so it's only isolating classes
                # from each other, not methods from each other).
                reactor._stopThreadPool()
test_threads.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test__default_pool_is_disconnected_pool(self):
        pool = reactor.threadpool
        self.assertThat(pool, IsInstance(ThreadPool))
        self.assertThat(
            pool.context.contextFactory,
            Is(orm.TotallyDisconnected))
        self.assertThat(pool.min, Equals(0))
twist.py 文件源码 项目:bwscanner 作者: TheTorProject 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __install():
    log = logging.getLogger('tpython')
    log.info('setting up twisted reactor in ipython loop')

    from twisted.internet import _threadedselect
    _threadedselect.install()

    from twisted.internet import reactor
    from collections import deque
    from IPython.lib import inputhook
    from IPython import InteractiveShell

    q = deque()

    def reactor_wake(twisted_loop_next, q=q):
        q.append(twisted_loop_next)

    def reactor_work(*_args):
        if q:
            while len(q):
                q.popleft()()
        return 0

    def reactor_start(*_args):
        log.info('starting twisted reactor in ipython')
        reactor.interleave(reactor_wake)  # @UndefinedVariable
        inputhook.set_inputhook(reactor_work)

    def reactor_stop():
        if reactor.threadpool:  # @UndefinedVariable
            log.info('stopping twisted threads')
            reactor.threadpool.stop()  # @UndefinedVariable
        log.info('shutting down twisted reactor')
        reactor._mainLoopShutdown()  # @UndefinedVariable

    ip = InteractiveShell.instance()

    ask_exit = ip.ask_exit
    def ipython_exit():
        reactor_stop()
        return ask_exit()

    ip.ask_exit = ipython_exit

    reactor_start()

    return reactor


问题


面经


文章

微信
公众号

扫码关注公众号