python类ProcessTerminated()的实例源码

test_process.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def processEnded(self, reason):
        if not reason.check(error.ProcessTerminated):
            self.deferred.callback("wrong termination: %s" % reason)
            return
        v = reason.value
        if v.exitCode is not None:
            self.deferred.callback("SIG%s: exitCode is %s, not None" % 
                                   (self.signal, v.exitCode))
            return
        if v.signal != getattr(signal,'SIG'+self.signal):
            self.deferred.callback("SIG%s: .signal was %s, wanted %s" % 
                                   (self.signal, v.signal,
                                    getattr(signal,'SIG'+self.signal)))
            return
        if os.WTERMSIG(v.status) != getattr(signal,'SIG'+self.signal):
            self.deferred.callback('SIG%s: %s'
                                   % (self.signal, os.WTERMSIG(v.status)))
            return
        self.deferred.callback(None)
test_process.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testProcess(self):
        exe = sys.executable
        scriptPath = util.sibpath(__file__, "process_tester.py")
        d = defer.Deferred()
        p = TestProcessProtocol()
        p.deferred = d
        reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None)
        def check(ignored):
            self.assertEquals(p.stages, [1, 2, 3, 4, 5])
            f = p.reason
            f.trap(error.ProcessTerminated)
            self.assertEquals(f.value.exitCode, 23)
            # would .signal be available on non-posix?
            # self.assertEquals(f.value.signal, None)
            try:
                import process_tester, glob
                for f in glob.glob(process_tester.test_file_match):
                    os.remove(f)
            except:
                pass
        d.addCallback(check)
        return d
test_process.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def testManyProcesses(self):

        def _check(results, protocols):
            for p in protocols:
                self.assertEquals(p.stages, [1, 2, 3, 4, 5], "[%d] stages = %s" % (id(p.transport), str(p.stages)))
                # test status code
                f = p.reason
                f.trap(error.ProcessTerminated)
                self.assertEquals(f.value.exitCode, 23)

        exe = sys.executable
        scriptPath = util.sibpath(__file__, "process_tester.py")
        args = [exe, "-u", scriptPath]
        protocols = []
        deferreds = []

        for i in xrange(50):
            p = TestManyProcessProtocol()
            protocols.append(p)
            reactor.spawnProcess(p, exe, args, env=None)
            deferreds.append(p.deferred)

        deferredList = defer.DeferredList(deferreds, consumeErrors=True)
        deferredList.addCallback(_check, protocols)
        return deferredList
process.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def processExited(self, failure):
        err = failure.trap(
            internet_error.ProcessDone, internet_error.ProcessTerminated)

        if err == internet_error.ProcessDone:
            pass

        elif err == internet_error.ProcessTerminated:
            self.failed = True
            self.errmsg = failure.value.exitCode
            if self.errmsg:
                self.log.debug('Process Exited, status %d' % (self.errmsg,))
            else:
                self.log.warn('%r' % failure.value)
        if IS_MAC:
            # TODO: need to exit properly!
            self.errmsg = None
        self.proto = None
        self._turn_state_off()
test_shutdownmanager.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_restart_fails(self):
        """
        If an error occurs before the error checking timeout the activity will
        be failed.  Data printed by the process prior to the failure is
        included in the activity's result text.
        """
        message = {"type": "shutdown", "reboot": False, "operation-id": 100}
        self.plugin.perform_shutdown(message)

        def restart_failed(message_id):
            self.assertTrue(self.broker_service.exchanger.is_urgent())
            self.assertEqual(
                self.broker_service.message_store.get_pending_messages(),
                [{"type": "operation-result", "api": b"3.2",
                  "operation-id": 100, "timestamp": 0, "status": FAILED,
                  "result-text": u"Failure text is reported."}])

        [arguments] = self.process_factory.spawns
        protocol = arguments[0]
        protocol.result.addCallback(restart_failed)
        protocol.childDataReceived(0, "Failure text is reported.")
        protocol.processEnded(Failure(ProcessTerminated(exitCode=1)))
        return protocol.result
test_shutdownmanager.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_process_ends_after_timeout(self):
        """
        If the process ends after the error checking timeout has passed
        C{result} will not be re-fired.
        """
        message = {"type": "shutdown", "reboot": False, "operation-id": 100}
        self.plugin.perform_shutdown(message)

        stash = []

        def restart_performed(ignore):
            self.assertEqual(stash, [])
            stash.append(True)

        [arguments] = self.process_factory.spawns
        protocol = arguments[0]
        protocol.result.addCallback(restart_performed)
        self.manager.reactor.advance(10)
        protocol.processEnded(Failure(ProcessTerminated(exitCode=1)))
        return protocol.result
test_process.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def processEnded(self, reason):
        if not reason.check(error.ProcessTerminated):
            self.deferred.callback("wrong termination: %s" % reason)
            return
        v = reason.value
        if v.exitCode is not None:
            self.deferred.callback("SIG%s: exitCode is %s, not None" % 
                                   (self.signal, v.exitCode))
            return
        if v.signal != getattr(signal,'SIG'+self.signal):
            self.deferred.callback("SIG%s: .signal was %s, wanted %s" % 
                                   (self.signal, v.signal,
                                    getattr(signal,'SIG'+self.signal)))
            return
        if os.WTERMSIG(v.status) != getattr(signal,'SIG'+self.signal):
            self.deferred.callback('SIG%s: %s'
                                   % (self.signal, os.WTERMSIG(v.status)))
            return
        self.deferred.callback(None)
test_process.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testProcess(self):
        exe = sys.executable
        scriptPath = util.sibpath(__file__, "process_tester.py")
        d = defer.Deferred()
        p = TestProcessProtocol()
        p.deferred = d
        reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None)
        def check(ignored):
            self.assertEquals(p.stages, [1, 2, 3, 4, 5])
            f = p.reason
            f.trap(error.ProcessTerminated)
            self.assertEquals(f.value.exitCode, 23)
            # would .signal be available on non-posix?
            # self.assertEquals(f.value.signal, None)
            try:
                import process_tester, glob
                for f in glob.glob(process_tester.test_file_match):
                    os.remove(f)
            except:
                pass
        d.addCallback(check)
        return d
test_process.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def testManyProcesses(self):

        def _check(results, protocols):
            for p in protocols:
                self.assertEquals(p.stages, [1, 2, 3, 4, 5], "[%d] stages = %s" % (id(p.transport), str(p.stages)))
                # test status code
                f = p.reason
                f.trap(error.ProcessTerminated)
                self.assertEquals(f.value.exitCode, 23)

        exe = sys.executable
        scriptPath = util.sibpath(__file__, "process_tester.py")
        args = [exe, "-u", scriptPath]
        protocols = []
        deferreds = []

        for i in xrange(50):
            p = TestManyProcessProtocol()
            protocols.append(p)
            reactor.spawnProcess(p, exe, args, env=None)
            deferreds.append(p.deferred)

        deferredList = defer.DeferredList(deferreds, consumeErrors=True)
        deferredList.addCallback(_check, protocols)
        return deferredList
test_session.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_wrapProcessProtocol_Protocol(self):
        """
        L{wrapPRocessProtocol}, when passed a L{Protocol} should return
        something that follows the L{IProcessProtocol} interface, with
        connectionMade() mapping to connectionMade(), outReceived() mapping to
        dataReceived() and processEnded() mapping to connectionLost().
        """
        protocol = MockProtocol()
        protocol.transport = StubTransport()
        process_protocol = session.wrapProcessProtocol(protocol)
        process_protocol.connectionMade()
        process_protocol.outReceived(b'data')
        self.assertEqual(protocol.transport.buf, b'data~')
        process_protocol.processEnded(failure.Failure(
            error.ProcessTerminated(0, None, None)))
        protocol.reason.trap(error.ProcessTerminated)
test_session.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_processEndedWithExitSignalCoreDump(self):
        """
        When processEnded is called, if there is an exit signal in the reason
        it should be sent in an exit-signal message.  The connection should be
        closed.
        """
        self.pp.processEnded(
            Failure(ProcessTerminated(1,
                signal.SIGTERM, 1 << 7))) # 7th bit means core dumped
        self.assertRequestsEqual(
            [(b'exit-signal',
              common.NS(b'TERM') # signal name
              + b'\x01' # core dumped is true
              + common.NS(b'') # error message
              + common.NS(b''), # language tag
              False)])
        self.assertSessionClosed()
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_abnormalTermination(self):
        """
        When a process terminates with a system exit code set to 1,
        C{processEnded} is called with a L{error.ProcessTerminated} error,
        the C{exitCode} attribute reflecting the system exit code.
        """
        d = defer.Deferred()
        p = TrivialProcessProtocol(d)
        reactor.spawnProcess(p, pyExe,
                             [pyExe, b'-c', b'import sys; sys.exit(1)'],
                             env=None, usePTY=self.usePTY)

        def check(ignored):
            p.reason.trap(error.ProcessTerminated)
            self.assertEqual(p.reason.value.exitCode, 1)
            self.assertIsNone(p.reason.value.signal)
        d.addCallback(check)
        return d
test_mail.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_processAliasTimeout(self):
        """
        If the alias child process does not exit within a particular period of
        time, the L{Deferred} returned by L{MessageWrapper.eomReceived} should
        fail with L{ProcessAliasTimeout} and send the I{KILL} signal to the
        child process..
        """
        reactor = task.Clock()
        transport = StubProcess()
        proto = mail.alias.ProcessAliasProtocol()
        proto.makeConnection(transport)

        receiver = mail.alias.MessageWrapper(proto, None, reactor)
        d = receiver.eomReceived()
        reactor.advance(receiver.completionTimeout)
        def timedOut(ignored):
            self.assertEqual(transport.signals, ['KILL'])
            # Now that it has been killed, disconnect the protocol associated
            # with it.
            proto.processEnded(
                ProcessTerminated(self.signalStatus(signal.SIGKILL)))
        self.assertFailure(d, mail.alias.ProcessAliasTimeout)
        d.addCallback(timedOut)
        return d
test_recvline.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def tearDown(self):
        # Kill the child process.  We're done with it.
        try:
            self.clientTransport.signalProcess("KILL")
        except OSError:
            pass
        def trap(failure):
            failure.trap(error.ProcessTerminated)
            self.assertEquals(failure.value.exitCode, None)
            self.assertEquals(failure.value.status, 9)
        return self.testTerminal.onDisconnection.addErrback(trap)
test_ssh.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, p):
        p.makeConnection(self)
        p.processEnded(failure.Failure(ProcessTerminated(255, None, None)))
test_ssh.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def loseConnection(self):
        if self.closed: return
        self.closed = 1
        self.proto.inConnectionLost()
        self.proto.outConnectionLost()
        self.proto.errConnectionLost()
        self.proto.processEnded(failure.Failure(ProcessTerminated(0, None, None)))
test_ssh.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def loseConnection(self):
        if self.closed: return
        self.closed = 1
        self.proto.inConnectionLost()
        self.proto.outConnectionLost()
        self.proto.errConnectionLost()
        self.proto.processEnded(failure.Failure(ProcessTerminated(0, None, None)))
test_ssh.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def loseConnection(self):
        if self.closed: return
        self.closed = 1
        self.proto.inConnectionLost()
        self.proto.outConnectionLost()
        self.proto.errConnectionLost()
        self.proto.processEnded(failure.Failure(ProcessTerminated(0, None, None)))
_dumbwin32proc.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def checkWork(self):
        if win32event.WaitForSingleObject(self.proc.hProcess, 0) != win32event.WAIT_OBJECT_0:
            return 0
        exitCode = win32process.GetExitCodeProcess(self.proc.hProcess)
        if exitCode == 0:
            err = error.ProcessDone(exitCode)
        else:
            err = error.ProcessTerminated(exitCode)
        self.deactivate()
        self.proc.protocol.processEnded(failure.Failure(err))
        return 0
test_threads.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testCallBeforeStartupUnexecuted(self):
        progname = self.mktemp()
        progfile = file(progname, 'w')
        progfile.write(_callBeforeStartupProgram % {'reactor': reactor.__module__})
        progfile.close()

        def programFinished((out, err, reason)):
            if reason.check(error.ProcessTerminated):
                self.fail("Process did not exit cleanly (out: %s err: %s)" % (out, err))

            if err:
                log.msg("Unexpected output on standard error: %s" % (err,))
            self.failIf(out, "Expected no output, instead received:\n%s" % (out,))
test_process.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _endProcess(self, reason, p):
        self.failIf(reason.check(error.ProcessDone),
                    'Child should fail due to EPIPE.')
        reason.trap(error.ProcessTerminated)
        # child must not get past that write without raising
        self.failIfEqual(reason.value.exitCode, 42,
                         'process reason was %r' % reason)
        self.failUnlessEqual(p.output, '')
        return p.errput
test_changer.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_change_packages_with_failed_reboot(self):
        """
        When a C{reboot-if-necessary} flag is passed in the C{change-packages},
        A C{ShutdownProtocol} is created and the package result change is
        returned, even if the reboot fails.
        """
        self.store.add_task("changer",
                            {"type": "change-packages", "install": [2],
                             "binaries": [(HASH2, 2, PKGDEB2)],
                             "operation-id": 123,
                             "reboot-if-necessary": True})

        def return_good_result(self):
            return "Yeah, I did whatever you've asked for!"
        self.replace_perform_changes(return_good_result)

        result = self.changer.handle_tasks()

        def got_result(result):
            self.assertMessages(self.get_pending_messages(),
                                [{"operation-id": 123,
                                  "result-code": 1,
                                  "result-text": "Yeah, I did whatever you've "
                                                 "asked for!",
                                  "type": "change-packages-result"}])
            self.log_helper.ignore_errors(ShutdownFailedError)

        self.landscape_reactor.advance(5)
        [arguments] = self.process_factory.spawns
        protocol = arguments[0]
        protocol.processEnded(Failure(ProcessTerminated(exitCode=1)))
        self.landscape_reactor.advance(10)
        return result.addCallback(got_result)
test_recvline.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def tearDown(self):
        # Kill the child process.  We're done with it.
        try:
            self.clientTransport.signalProcess("KILL")
        except OSError:
            pass
        def trap(failure):
            failure.trap(error.ProcessTerminated)
            self.assertEquals(failure.value.exitCode, None)
            self.assertEquals(failure.value.status, 9)
        return self.testTerminal.onDisconnection.addErrback(trap)
test_ssh.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, p):
        p.makeConnection(self)
        p.processEnded(failure.Failure(ProcessTerminated(255, None, None)))
test_ssh.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def loseConnection(self):
        if self.closed: return
        self.closed = 1
        self.proto.inConnectionLost()
        self.proto.outConnectionLost()
        self.proto.errConnectionLost()
        self.proto.processEnded(failure.Failure(ProcessTerminated(0, None, None)))
test_ssh.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def loseConnection(self):
        if self.closed: return
        self.closed = 1
        self.proto.inConnectionLost()
        self.proto.outConnectionLost()
        self.proto.errConnectionLost()
        self.proto.processEnded(failure.Failure(ProcessTerminated(0, None, None)))
test_ssh.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def loseConnection(self):
        if self.closed: return
        self.closed = 1
        self.proto.inConnectionLost()
        self.proto.outConnectionLost()
        self.proto.errConnectionLost()
        self.proto.processEnded(failure.Failure(ProcessTerminated(0, None, None)))
_dumbwin32proc.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def checkWork(self):
        if win32event.WaitForSingleObject(self.proc.hProcess, 0) != win32event.WAIT_OBJECT_0:
            return 0
        exitCode = win32process.GetExitCodeProcess(self.proc.hProcess)
        if exitCode == 0:
            err = error.ProcessDone(exitCode)
        else:
            err = error.ProcessTerminated(exitCode)
        self.deactivate()
        self.proc.protocol.processEnded(failure.Failure(err))
        return 0
test_threads.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testCallBeforeStartupUnexecuted(self):
        progname = self.mktemp()
        progfile = file(progname, 'w')
        progfile.write(_callBeforeStartupProgram % {'reactor': reactor.__module__})
        progfile.close()

        def programFinished((out, err, reason)):
            if reason.check(error.ProcessTerminated):
                self.fail("Process did not exit cleanly (out: %s err: %s)" % (out, err))

            if err:
                log.msg("Unexpected output on standard error: %s" % (err,))
            self.failIf(out, "Expected no output, instead received:\n%s" % (out,))
test_process.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _endProcess(self, reason, p):
        self.failIf(reason.check(error.ProcessDone),
                    'Child should fail due to EPIPE.')
        reason.trap(error.ProcessTerminated)
        # child must not get past that write without raising
        self.failIfEqual(reason.value.exitCode, 42,
                         'process reason was %r' % reason)
        self.failUnlessEqual(p.output, '')
        return p.errput


问题


面经


文章

微信
公众号

扫码关注公众号