python类spawnProcess()的实例源码

test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_echo(self):
        """
        A spawning a subprocess which echoes its stdin to its stdout via
        L{IReactorProcess.spawnProcess} will result in that echoed output being
        delivered to outReceived.
        """
        finished = defer.Deferred()
        p = EchoProtocol(finished)

        scriptPath = b"twisted.test.process_echoer"
        reactor.spawnProcess(p, pyExe, [pyExe, b'-u', b"-m", scriptPath],
                             env=properEnv)

        def asserts(ignored):
            self.assertFalse(p.failure, p.failure)
            self.assertTrue(hasattr(p, 'buffer'))
            self.assertEqual(len(p.buffer), len(p.s * p.n))

        def takedownProcess(err):
            p.transport.closeStdin()
            return err

        return finished.addCallback(asserts).addErrback(takedownProcess)
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_abnormalTermination(self):
        """
        When a process terminates with a system exit code set to 1,
        C{processEnded} is called with a L{error.ProcessTerminated} error,
        the C{exitCode} attribute reflecting the system exit code.
        """
        d = defer.Deferred()
        p = TrivialProcessProtocol(d)
        reactor.spawnProcess(p, pyExe,
                             [pyExe, b'-c', b'import sys; sys.exit(1)'],
                             env=None, usePTY=self.usePTY)

        def check(ignored):
            p.reason.trap(error.ProcessTerminated)
            self.assertEqual(p.reason.value.exitCode, 1)
            self.assertIsNone(p.reason.value.signal)
        d.addCallback(check)
        return d
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_executionError(self):
        """
        Raise an error during execvpe to check error management.
        """
        cmd = self.getCommand('false')

        d = defer.Deferred()
        p = TrivialProcessProtocol(d)
        def buggyexecvpe(command, args, environment):
            raise RuntimeError("Ouch")
        oldexecvpe = os.execvpe
        os.execvpe = buggyexecvpe
        try:
            reactor.spawnProcess(p, cmd, [b'false'], env=None,
                                 usePTY=self.usePTY)

            def check(ignored):
                errData = b"".join(p.errData + p.outData)
                self.assertIn(b"Upon execvpe", errData)
                self.assertIn(b"Ouch", errData)
            d.addCallback(check)
        finally:
            os.execvpe = oldexecvpe
        return d
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _mockForkInParentTest(self):
        """
        Assert that in the main process, spawnProcess disables the garbage
        collector, calls fork, closes the pipe file descriptors it created for
        the child process, and calls waitpid.
        """
        self.mockos.child = False
        cmd = b'/mock/ouch'

        d = defer.Deferred()
        p = TrivialProcessProtocol(d)
        reactor.spawnProcess(p, cmd, [b'ouch'], env=None,
                             usePTY=False)
        # It should close the first read pipe, and the 2 last write pipes
        self.assertEqual(set(self.mockos.closed), set([-1, -4, -6]))
        self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"])
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_mockForkErrorGivenFDs(self):
        """
        When C{os.forks} raises an exception and that file descriptors have
        been specified with the C{childFDs} arguments of
        L{reactor.spawnProcess}, they are not closed.
        """
        self.mockos.raiseFork = OSError(errno.EAGAIN, None)
        protocol = TrivialProcessProtocol(None)
        self.assertRaises(OSError, reactor.spawnProcess, protocol, None,
            childFDs={0: -10, 1: -11, 2: -13})
        self.assertEqual(self.mockos.actions, [("fork", False)])
        self.assertEqual(self.mockos.closed, [])

        # We can also put "r" or "w" to let twisted create the pipes
        self.assertRaises(OSError, reactor.spawnProcess, protocol, None,
            childFDs={0: "r", 1: -11, 2: -13})
        self.assertEqual(set(self.mockos.closed), set([-1, -2]))
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_mockPTYSetUid(self):
        """
        Try creating a PTY process with setting its uid: it's almost the same
        path as the standard path, but with a C{switchUID} call before the
        exec.
        """
        cmd = b'/mock/ouch'

        d = defer.Deferred()
        p = TrivialProcessProtocol(d)
        try:
            reactor.spawnProcess(p, cmd, [b'ouch'], env=None,
                                 usePTY=True, uid=8081)
        except SystemError:
            self.assertTrue(self.mockos.exited)
            self.assertEqual(
                self.mockos.actions,
                [('fork', False), 'setsid', ('setuid', 0), ('setgid', 0),
                 ('switchuid', 8081, 1234), 'exec', ('exit', 1)])
        else:
            self.fail("Should not be here")
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_mockPTYSetUidInParent(self):
        """
        When spawning a child process with PTY and a UID different from the UID
        of the current process, the current process does not have its UID
        changed.
        """
        self.mockos.child = False
        cmd = b'/mock/ouch'

        d = defer.Deferred()
        p = TrivialProcessProtocol(d)
        oldPTYProcess = process.PTYProcess
        try:
            process.PTYProcess = DumbPTYProcess
            reactor.spawnProcess(p, cmd, [b'ouch'], env=None,
                                 usePTY=True, uid=8080)
        finally:
            process.PTYProcess = oldPTYProcess
        self.assertEqual(self.mockos.actions, [('fork', False), 'waitpid'])
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_mockWithWaitError(self):
        """
        Test that reapProcess logs errors raised.
        """
        self.mockos.child = False
        cmd = b'/mock/ouch'
        self.mockos.waitChild = (0, 0)

        d = defer.Deferred()
        p = TrivialProcessProtocol(d)
        proc = reactor.spawnProcess(p, cmd, [b'ouch'], env=None,
                             usePTY=False)
        self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"])

        self.mockos.raiseWaitPid = OSError()
        proc.reapProcess()
        errors = self.flushLoggedErrors()
        self.assertEqual(len(errors), 1)
        errors[0].trap(OSError)
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_mockErrorECHILDInReapProcess(self):
        """
        Test that reapProcess doesn't log anything when waitpid raises a
        C{OSError} with errno C{ECHILD}.
        """
        self.mockos.child = False
        cmd = b'/mock/ouch'
        self.mockos.waitChild = (0, 0)

        d = defer.Deferred()
        p = TrivialProcessProtocol(d)
        proc = reactor.spawnProcess(p, cmd, [b'ouch'], env=None,
                                    usePTY=False)
        self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"])

        self.mockos.raiseWaitPid = OSError()
        self.mockos.raiseWaitPid.errno = errno.ECHILD
        # This should not produce any errors
        proc.reapProcess()
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_stderr(self):
        """
        Bytes written to stderr by the spawned process are passed to the
        C{errReceived} callback on the C{ProcessProtocol} passed to
        C{spawnProcess}.
        """
        value = "42"

        p = Accumulator()
        d = p.endedDeferred = defer.Deferred()
        reactor.spawnProcess(p, pyExe,
                             [pyExe, b"-c",
                              networkString("import sys; sys.stderr.write"
                                            "('{0}')".format(value))],
                             env=None, path="/tmp",
                             usePTY=self.usePTY)

        def processEnded(ign):
            self.assertEqual(b"42", p.errF.getvalue())
        return d.addCallback(processEnded)
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_openingTTY(self):
        scriptPath = b"twisted.test.process_tty"
        p = Accumulator()
        d = p.endedDeferred = defer.Deferred()
        reactor.spawnProcess(p, pyExe, [pyExe, b"-u", b"-m", scriptPath],
                             env=properEnv, usePTY=self.usePTY)
        p.transport.write(b"hello world!\n")

        def processEnded(ign):
            self.assertRaises(
                error.ProcessExitedAlready, p.transport.signalProcess, 'HUP')
            self.assertEqual(
                p.outF.getvalue(),
                b"hello world!\r\nhello world!\r\n",
                ("Error message from process_tty "
                 "follows:\n\n%s\n\n" % (p.outF.getvalue(),)))
        return d.addCallback(processEnded)
test_process.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_encodableUnicodeEnvironment(self):
        """
        Test C{os.environ} (inherited by every subprocess on Windows) that
        contains an ascii-encodable Unicode string. This is different from
        passing Unicode environment explicitly to spawnProcess (which is not
        supported on Python 2).
        """
        os.environ[self.goodKey] = self.goodValue
        self.addCleanup(operator.delitem, os.environ, self.goodKey)

        p = GetEnvironmentDictionary.run(reactor, [], properEnv)
        def gotEnvironment(environ):
            self.assertEqual(
                environ[self.goodKey.encode('ascii')],
                self.goodValue.encode('ascii'))
        return p.getResult().addCallback(gotEnvironment)
disttrial.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def launchWorkerProcesses(self, spawner, protocols, arguments):
        """
        Spawn processes from a list of process protocols.

        @param spawner: A C{IReactorProcess.spawnProcess} implementation.

        @param protocols: An iterable of C{ProcessProtocol} instances.

        @param arguments: Extra arguments passed to the processes.
        """
        workertrialPath = theSystemPath[
            'twisted.trial._dist.workertrial'].filePath.path
        childFDs = {0: 'w', 1: 'r', 2: 'r', _WORKER_AMP_STDIN: 'w',
                    _WORKER_AMP_STDOUT: 'r'}
        environ = os.environ.copy()
        # Add an environment variable containing the raw sys.path, to be used by
        # subprocesses to make sure it's identical to the parent. See
        # workertrial._setupPath.
        environ['TRIAL_PYTHONPATH'] = os.pathsep.join(sys.path)
        for worker in protocols:
            args = [sys.executable, workertrialPath]
            args.extend(arguments)
            spawner(worker, sys.executable, args=args, childFDs=childFDs,
                    env=environ)
utils.py 文件源码 项目:duct 作者: ducted 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def fork(executable, args=(), env={}, path=None, timeout=3600):
    """fork
    Provides a deferred wrapper function with a timeout function

    :param executable: Executable
    :type executable: str.
    :param args: Tupple of arguments
    :type args: tupple.
    :param env: Environment dictionary
    :type env: dict.
    :param timeout: Kill the child process if timeout is exceeded
    :type timeout: int.
    """
    de = defer.Deferred()
    proc = ProcessProtocol(de, timeout)
    reactor.spawnProcess(proc, executable, (executable,)+tuple(args), env,
                         path)
    return de
script.py 文件源码 项目:ooniprobe-debian 作者: TheTorProject 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_run_script(self):
        """
        We run the script specified in the usage options and take whatever
        is printed to stdout as the results of the test.
        """
        processProtocol = ScriptProcessProtocol(self)

        interpreter = self.localOptions['interpreter']
        if not which(interpreter):
            log.err('Unable to find %s executable in PATH.' % interpreter)
            return

        reactor.spawnProcess(processProtocol,
                             interpreter,
                             args=[interpreter, self.localOptions['script']],
                             env={'HOME': os.environ['HOME']},
                             usePTY=True)

        if not reactor.running:
            reactor.run()
        return processProtocol.deferred
clusterservice.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def spawnProcessAndNullifyStdout(protocol, args):
    """"Utility function to spawn a process and redirect stdout to /dev/null.

    Spawns the process with the specified `protocol` in the reactor, with the
    specified list of binary `args`.
    """
    # Using childFDs we arrange for the child's stdout to go to /dev/null
    # and for stderr to be read asynchronously by the reactor.
    with open(os.devnull, "r+b") as devnull:
        # This file descriptor to /dev/null will be closed before the
        # spawned process finishes, but will remain open in the spawned
        # process; that's the Magic Of UNIX™.
        reactor.spawnProcess(
            protocol, args[0], args, childFDs={
                0: devnull.fileno(),
                1: devnull.fileno(),
                2: 'r'
            },
            env=select_c_utf8_bytes_locale())
test_services.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test__protocol_logs_stderr(self):
        logger = self.useFixture(TwistedLoggerFixture())
        ifname = factory.make_name('eth')
        service = NeighbourDiscoveryService(ifname, lambda _: None)
        protocol = service.createProcessProtocol()
        reactor.spawnProcess(protocol, b"sh", (b"sh", b"-c", b"exec cat >&2"))
        protocol.transport.write(
            b"Lines written to stderr are logged\n"
            b"with a prefix, with no exceptions.\n")
        protocol.transport.closeStdin()
        yield protocol.done
        self.assertThat(logger.output, Equals(
            "observe-arp[%s]: Lines written to stderr are logged\n"
            "---\n"
            "observe-arp[%s]: with a prefix, with no exceptions."
            % (ifname, ifname)))
test_services.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test__protocol_logs_stderr(self):
        logger = self.useFixture(TwistedLoggerFixture())
        ifname = factory.make_name('eth')
        service = BeaconingService(ifname, lambda _: None)
        protocol = service.createProcessProtocol()
        reactor.spawnProcess(protocol, b"sh", (b"sh", b"-c", b"exec cat >&2"))
        protocol.transport.write(
            b"Lines written to stderr are logged\n"
            b"with a prefix, with no exceptions.\n")
        protocol.transport.closeStdin()
        yield protocol.done
        self.assertThat(logger.output, Equals(
            "observe-beacons[%s]: Lines written to stderr are logged\n"
            "---\n"
            "observe-beacons[%s]: with a prefix, with no exceptions."
            % (ifname, ifname)))
test_recvline.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def setUp(self):
        # A memory-only terminal emulator, into which the server will
        # write things and make other state changes.  What ends up
        # here is basically what a user would have seen on their
        # screen.
        testTerminal = NotifyingExpectableBuffer()

        # An insults client protocol which will translate bytes
        # received from the child process into keystroke commands for
        # an ITerminalProtocol.
        insultsClient = insults.ClientProtocol(lambda: testTerminal)

        # A process protocol which will translate stdout and stderr
        # received from the child process to dataReceived calls and
        # error reporting on an insults client protocol.
        processClient = stdio.TerminalProcessProtocol(insultsClient)

        # Run twisted/conch/stdio.py with the name of a class
        # implementing ITerminalProtocol.  This class will be used to
        # handle bytes we send to the child process.
        exe = sys.executable
        module = stdio.__file__
        args = ["python2.3", module, reflect.qual(self.serverProtocol)]
        env = {"PYTHONPATH": os.pathsep.join(sys.path)}

        from twisted.internet import reactor
        clientTransport = reactor.spawnProcess(processClient, exe, args,
                                               env=env, usePTY=True)

        self.recvlineClient = self.testTerminal = testTerminal
        self.processClient = processClient
        self.clientTransport = clientTransport

        # Wait for the process protocol and test terminal to become
        # connected before proceeding.  The former should always
        # happen first, but it doesn't hurt to be safe.
        return defer.gatherResults(filter(None, [
            processClient.onConnection,
            testTerminal.expect(">>> ")]))
test_conch.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def serviceStarted(self):
            if self.spawn:
                env = os.environ.copy()
                env['PYTHONPATH'] = os.pathsep.join(sys.path)
                reactor.callLater(0,reactor.spawnProcess, env=env, *self.spawn)
            self.connected = 1


问题


面经


文章

微信
公众号

扫码关注公众号