python类ReactorNotRunning()的实例源码

log.py 文件源码 项目:privcount 作者: privcount 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def stop_reactor(exit_code=0):
    '''
    Stop the reactor and exit with exit_code.
    If exit_code is None, don't exit, just return to the caller.
    exit_code must be between 1 and 255.
    '''
    if exit_code is not None:
        logging.warning("Exiting with code {}".format(exit_code))
    else:
        # Let's hope the calling code exits pretty soon after this
        logging.warning("Stopping reactor")

    try:
        reactor.stop()
    except ReactorNotRunning:
        pass

    # return to the caller and let it decide what to do
    if exit_code == None:
        return

    # a graceful exit
    if exit_code == 0:
        sys.exit()

    # a hard exit
    assert exit_code >= 0
    assert exit_code <= 127
    os._exit(exit_code)
test_peerBeat.py 文件源码 项目:congredi 作者: toxik-io 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_peerBeat(self):
        self.threshold = .1
        print('IMPLEMENT tests/test_peerBeat')
        queryBackground()
        updateRead()
        updateWrite()
        peerSuccess()
        fails = mock_traceback(b'wow')
        try:
            peerFailure(fails)
        except ReactorNotRunning:
            print('good')
        peerBeat()
spawnsvc.py 文件源码 项目:ccs-twistedextensions 作者: apple 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def goodbye(reason):
        """
        Stop the process if stdin is closed.
        """
        try:
            reactor.stop()
        except ReactorNotRunning:
            pass
        return origLost(reason)
utils.py 文件源码 项目:checo 作者: kc1212 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def stop_reactor():
    try:
        reactor.stop()
        logging.info("STOPPING REACTOR")
    except error.ReactorNotRunning:
        pass
base.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def stop(self):
        """
        See twisted.internet.interfaces.IReactorCore.stop.
        """
        if self._stopped:
            raise error.ReactorNotRunning(
                "Can't stop reactor that isn't running.")
        self._stopped = True
        self._justStopped = True
        self._startedBefore = True
test_task.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def stop(self):
        """
        Stop the reactor.
        """
        if not self._running:
            raise error.ReactorNotRunning()
        self._running = False


问题


面经


文章

微信
公众号

扫码关注公众号