def stop_reactor(exit_code=0):
'''
Stop the reactor and exit with exit_code.
If exit_code is None, don't exit, just return to the caller.
exit_code must be between 1 and 255.
'''
if exit_code is not None:
logging.warning("Exiting with code {}".format(exit_code))
else:
# Let's hope the calling code exits pretty soon after this
logging.warning("Stopping reactor")
try:
reactor.stop()
except ReactorNotRunning:
pass
# return to the caller and let it decide what to do
if exit_code == None:
return
# a graceful exit
if exit_code == 0:
sys.exit()
# a hard exit
assert exit_code >= 0
assert exit_code <= 127
os._exit(exit_code)
python类ReactorNotRunning()的实例源码
def test_peerBeat(self):
self.threshold = .1
print('IMPLEMENT tests/test_peerBeat')
queryBackground()
updateRead()
updateWrite()
peerSuccess()
fails = mock_traceback(b'wow')
try:
peerFailure(fails)
except ReactorNotRunning:
print('good')
peerBeat()
def goodbye(reason):
"""
Stop the process if stdin is closed.
"""
try:
reactor.stop()
except ReactorNotRunning:
pass
return origLost(reason)
def stop_reactor():
try:
reactor.stop()
logging.info("STOPPING REACTOR")
except error.ReactorNotRunning:
pass
def stop(self):
"""
See twisted.internet.interfaces.IReactorCore.stop.
"""
if self._stopped:
raise error.ReactorNotRunning(
"Can't stop reactor that isn't running.")
self._stopped = True
self._justStopped = True
self._startedBefore = True
def stop(self):
"""
Stop the reactor.
"""
if not self._running:
raise error.ReactorNotRunning()
self._running = False