test_process.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码
def test_errorInProcessEnded(self):
        """
        The handler which reaps a process is removed when the process is
        reaped, even if the protocol's C{processEnded} method raises an
        exception.
        """
        connected = defer.Deferred()
        ended = defer.Deferred()

        # This script runs until we disconnect its transport.
        scriptPath = b"twisted.test.process_echoer"

        class ErrorInProcessEnded(protocol.ProcessProtocol):
            """
            A protocol that raises an error in C{processEnded}.
            """
            def makeConnection(self, transport):
                connected.callback(transport)

            def processEnded(self, reason):
                reactor.callLater(0, ended.callback, None)
                raise RuntimeError("Deliberate error")

        # Launch the process.
        reactor.spawnProcess(
            ErrorInProcessEnded(), pyExe,
            [pyExe, b"-u", b"-m", scriptPath],
            env=properEnv, path=None)

        pid = []
        def cbConnected(transport):
            pid.append(transport.pid)
            # There's now a reap process handler registered.
            self.assertIn(transport.pid, process.reapProcessHandlers)

            # Kill the process cleanly, triggering an error in the protocol.
            transport.loseConnection()
        connected.addCallback(cbConnected)

        def checkTerminated(ignored):
            # The exception was logged.
            excs = self.flushLoggedErrors(RuntimeError)
            self.assertEqual(len(excs), 1)
            # The process is no longer scheduled for reaping.
            self.assertNotIn(pid[0], process.reapProcessHandlers)
        ended.addCallback(checkTerminated)

        return ended
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号