def __install():
log = logging.getLogger('tpython')
log.info('setting up twisted reactor in ipython loop')
from twisted.internet import _threadedselect
_threadedselect.install()
from twisted.internet import reactor
from collections import deque
from IPython.lib import inputhook
from IPython import InteractiveShell
q = deque()
def reactor_wake(twisted_loop_next, q=q):
q.append(twisted_loop_next)
def reactor_work(*_args):
if q:
while len(q):
q.popleft()()
return 0
def reactor_start(*_args):
log.info('starting twisted reactor in ipython')
reactor.interleave(reactor_wake) # @UndefinedVariable
inputhook.set_inputhook(reactor_work)
def reactor_stop():
if reactor.threadpool: # @UndefinedVariable
log.info('stopping twisted threads')
reactor.threadpool.stop() # @UndefinedVariable
log.info('shutting down twisted reactor')
reactor._mainLoopShutdown() # @UndefinedVariable
ip = InteractiveShell.instance()
ask_exit = ip.ask_exit
def ipython_exit():
reactor_stop()
return ask_exit()
ip.ask_exit = ipython_exit
reactor_start()
return reactor
评论列表
文章目录