def threaded_reactor():
"""
Start the Twisted reactor in a separate thread, if not already done.
Returns the reactor.
The thread will automatically be destroyed when all the tests are done.
"""
global _twisted_thread
try:
from twisted.internet import reactor
except ImportError:
return None, None
if not _twisted_thread:
from twisted.python import threadable
from threading import Thread
_twisted_thread = Thread(target=lambda: reactor.run( \
installSignalHandlers=False))
_twisted_thread.setDaemon(True)
_twisted_thread.start()
return reactor, _twisted_thread
# Export global reactor variable, as Twisted does
评论列表
文章目录