def test_echo(self):
"""
A spawning a subprocess which echoes its stdin to its stdout via
L{IReactorProcess.spawnProcess} will result in that echoed output being
delivered to outReceived.
"""
finished = defer.Deferred()
p = EchoProtocol(finished)
scriptPath = b"twisted.test.process_echoer"
reactor.spawnProcess(p, pyExe, [pyExe, b'-u', b"-m", scriptPath],
env=properEnv)
def asserts(ignored):
self.assertFalse(p.failure, p.failure)
self.assertTrue(hasattr(p, 'buffer'))
self.assertEqual(len(p.buffer), len(p.s * p.n))
def takedownProcess(err):
p.transport.closeStdin()
return err
return finished.addCallback(asserts).addErrback(takedownProcess)
python类spawnProcess()的实例源码
def test_abnormalTermination(self):
"""
When a process terminates with a system exit code set to 1,
C{processEnded} is called with a L{error.ProcessTerminated} error,
the C{exitCode} attribute reflecting the system exit code.
"""
d = defer.Deferred()
p = TrivialProcessProtocol(d)
reactor.spawnProcess(p, pyExe,
[pyExe, b'-c', b'import sys; sys.exit(1)'],
env=None, usePTY=self.usePTY)
def check(ignored):
p.reason.trap(error.ProcessTerminated)
self.assertEqual(p.reason.value.exitCode, 1)
self.assertIsNone(p.reason.value.signal)
d.addCallback(check)
return d
def test_executionError(self):
"""
Raise an error during execvpe to check error management.
"""
cmd = self.getCommand('false')
d = defer.Deferred()
p = TrivialProcessProtocol(d)
def buggyexecvpe(command, args, environment):
raise RuntimeError("Ouch")
oldexecvpe = os.execvpe
os.execvpe = buggyexecvpe
try:
reactor.spawnProcess(p, cmd, [b'false'], env=None,
usePTY=self.usePTY)
def check(ignored):
errData = b"".join(p.errData + p.outData)
self.assertIn(b"Upon execvpe", errData)
self.assertIn(b"Ouch", errData)
d.addCallback(check)
finally:
os.execvpe = oldexecvpe
return d
def _mockForkInParentTest(self):
"""
Assert that in the main process, spawnProcess disables the garbage
collector, calls fork, closes the pipe file descriptors it created for
the child process, and calls waitpid.
"""
self.mockos.child = False
cmd = b'/mock/ouch'
d = defer.Deferred()
p = TrivialProcessProtocol(d)
reactor.spawnProcess(p, cmd, [b'ouch'], env=None,
usePTY=False)
# It should close the first read pipe, and the 2 last write pipes
self.assertEqual(set(self.mockos.closed), set([-1, -4, -6]))
self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"])
def test_mockForkErrorGivenFDs(self):
"""
When C{os.forks} raises an exception and that file descriptors have
been specified with the C{childFDs} arguments of
L{reactor.spawnProcess}, they are not closed.
"""
self.mockos.raiseFork = OSError(errno.EAGAIN, None)
protocol = TrivialProcessProtocol(None)
self.assertRaises(OSError, reactor.spawnProcess, protocol, None,
childFDs={0: -10, 1: -11, 2: -13})
self.assertEqual(self.mockos.actions, [("fork", False)])
self.assertEqual(self.mockos.closed, [])
# We can also put "r" or "w" to let twisted create the pipes
self.assertRaises(OSError, reactor.spawnProcess, protocol, None,
childFDs={0: "r", 1: -11, 2: -13})
self.assertEqual(set(self.mockos.closed), set([-1, -2]))
def test_mockPTYSetUid(self):
"""
Try creating a PTY process with setting its uid: it's almost the same
path as the standard path, but with a C{switchUID} call before the
exec.
"""
cmd = b'/mock/ouch'
d = defer.Deferred()
p = TrivialProcessProtocol(d)
try:
reactor.spawnProcess(p, cmd, [b'ouch'], env=None,
usePTY=True, uid=8081)
except SystemError:
self.assertTrue(self.mockos.exited)
self.assertEqual(
self.mockos.actions,
[('fork', False), 'setsid', ('setuid', 0), ('setgid', 0),
('switchuid', 8081, 1234), 'exec', ('exit', 1)])
else:
self.fail("Should not be here")
def test_mockPTYSetUidInParent(self):
"""
When spawning a child process with PTY and a UID different from the UID
of the current process, the current process does not have its UID
changed.
"""
self.mockos.child = False
cmd = b'/mock/ouch'
d = defer.Deferred()
p = TrivialProcessProtocol(d)
oldPTYProcess = process.PTYProcess
try:
process.PTYProcess = DumbPTYProcess
reactor.spawnProcess(p, cmd, [b'ouch'], env=None,
usePTY=True, uid=8080)
finally:
process.PTYProcess = oldPTYProcess
self.assertEqual(self.mockos.actions, [('fork', False), 'waitpid'])
def test_mockWithWaitError(self):
"""
Test that reapProcess logs errors raised.
"""
self.mockos.child = False
cmd = b'/mock/ouch'
self.mockos.waitChild = (0, 0)
d = defer.Deferred()
p = TrivialProcessProtocol(d)
proc = reactor.spawnProcess(p, cmd, [b'ouch'], env=None,
usePTY=False)
self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"])
self.mockos.raiseWaitPid = OSError()
proc.reapProcess()
errors = self.flushLoggedErrors()
self.assertEqual(len(errors), 1)
errors[0].trap(OSError)
def test_mockErrorECHILDInReapProcess(self):
"""
Test that reapProcess doesn't log anything when waitpid raises a
C{OSError} with errno C{ECHILD}.
"""
self.mockos.child = False
cmd = b'/mock/ouch'
self.mockos.waitChild = (0, 0)
d = defer.Deferred()
p = TrivialProcessProtocol(d)
proc = reactor.spawnProcess(p, cmd, [b'ouch'], env=None,
usePTY=False)
self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"])
self.mockos.raiseWaitPid = OSError()
self.mockos.raiseWaitPid.errno = errno.ECHILD
# This should not produce any errors
proc.reapProcess()
def test_stderr(self):
"""
Bytes written to stderr by the spawned process are passed to the
C{errReceived} callback on the C{ProcessProtocol} passed to
C{spawnProcess}.
"""
value = "42"
p = Accumulator()
d = p.endedDeferred = defer.Deferred()
reactor.spawnProcess(p, pyExe,
[pyExe, b"-c",
networkString("import sys; sys.stderr.write"
"('{0}')".format(value))],
env=None, path="/tmp",
usePTY=self.usePTY)
def processEnded(ign):
self.assertEqual(b"42", p.errF.getvalue())
return d.addCallback(processEnded)
def test_openingTTY(self):
scriptPath = b"twisted.test.process_tty"
p = Accumulator()
d = p.endedDeferred = defer.Deferred()
reactor.spawnProcess(p, pyExe, [pyExe, b"-u", b"-m", scriptPath],
env=properEnv, usePTY=self.usePTY)
p.transport.write(b"hello world!\n")
def processEnded(ign):
self.assertRaises(
error.ProcessExitedAlready, p.transport.signalProcess, 'HUP')
self.assertEqual(
p.outF.getvalue(),
b"hello world!\r\nhello world!\r\n",
("Error message from process_tty "
"follows:\n\n%s\n\n" % (p.outF.getvalue(),)))
return d.addCallback(processEnded)
def test_encodableUnicodeEnvironment(self):
"""
Test C{os.environ} (inherited by every subprocess on Windows) that
contains an ascii-encodable Unicode string. This is different from
passing Unicode environment explicitly to spawnProcess (which is not
supported on Python 2).
"""
os.environ[self.goodKey] = self.goodValue
self.addCleanup(operator.delitem, os.environ, self.goodKey)
p = GetEnvironmentDictionary.run(reactor, [], properEnv)
def gotEnvironment(environ):
self.assertEqual(
environ[self.goodKey.encode('ascii')],
self.goodValue.encode('ascii'))
return p.getResult().addCallback(gotEnvironment)
def launchWorkerProcesses(self, spawner, protocols, arguments):
"""
Spawn processes from a list of process protocols.
@param spawner: A C{IReactorProcess.spawnProcess} implementation.
@param protocols: An iterable of C{ProcessProtocol} instances.
@param arguments: Extra arguments passed to the processes.
"""
workertrialPath = theSystemPath[
'twisted.trial._dist.workertrial'].filePath.path
childFDs = {0: 'w', 1: 'r', 2: 'r', _WORKER_AMP_STDIN: 'w',
_WORKER_AMP_STDOUT: 'r'}
environ = os.environ.copy()
# Add an environment variable containing the raw sys.path, to be used by
# subprocesses to make sure it's identical to the parent. See
# workertrial._setupPath.
environ['TRIAL_PYTHONPATH'] = os.pathsep.join(sys.path)
for worker in protocols:
args = [sys.executable, workertrialPath]
args.extend(arguments)
spawner(worker, sys.executable, args=args, childFDs=childFDs,
env=environ)
def fork(executable, args=(), env={}, path=None, timeout=3600):
"""fork
Provides a deferred wrapper function with a timeout function
:param executable: Executable
:type executable: str.
:param args: Tupple of arguments
:type args: tupple.
:param env: Environment dictionary
:type env: dict.
:param timeout: Kill the child process if timeout is exceeded
:type timeout: int.
"""
de = defer.Deferred()
proc = ProcessProtocol(de, timeout)
reactor.spawnProcess(proc, executable, (executable,)+tuple(args), env,
path)
return de
def test_run_script(self):
"""
We run the script specified in the usage options and take whatever
is printed to stdout as the results of the test.
"""
processProtocol = ScriptProcessProtocol(self)
interpreter = self.localOptions['interpreter']
if not which(interpreter):
log.err('Unable to find %s executable in PATH.' % interpreter)
return
reactor.spawnProcess(processProtocol,
interpreter,
args=[interpreter, self.localOptions['script']],
env={'HOME': os.environ['HOME']},
usePTY=True)
if not reactor.running:
reactor.run()
return processProtocol.deferred
def spawnProcessAndNullifyStdout(protocol, args):
""""Utility function to spawn a process and redirect stdout to /dev/null.
Spawns the process with the specified `protocol` in the reactor, with the
specified list of binary `args`.
"""
# Using childFDs we arrange for the child's stdout to go to /dev/null
# and for stderr to be read asynchronously by the reactor.
with open(os.devnull, "r+b") as devnull:
# This file descriptor to /dev/null will be closed before the
# spawned process finishes, but will remain open in the spawned
# process; that's the Magic Of UNIX™.
reactor.spawnProcess(
protocol, args[0], args, childFDs={
0: devnull.fileno(),
1: devnull.fileno(),
2: 'r'
},
env=select_c_utf8_bytes_locale())
def test__protocol_logs_stderr(self):
logger = self.useFixture(TwistedLoggerFixture())
ifname = factory.make_name('eth')
service = NeighbourDiscoveryService(ifname, lambda _: None)
protocol = service.createProcessProtocol()
reactor.spawnProcess(protocol, b"sh", (b"sh", b"-c", b"exec cat >&2"))
protocol.transport.write(
b"Lines written to stderr are logged\n"
b"with a prefix, with no exceptions.\n")
protocol.transport.closeStdin()
yield protocol.done
self.assertThat(logger.output, Equals(
"observe-arp[%s]: Lines written to stderr are logged\n"
"---\n"
"observe-arp[%s]: with a prefix, with no exceptions."
% (ifname, ifname)))
def test__protocol_logs_stderr(self):
logger = self.useFixture(TwistedLoggerFixture())
ifname = factory.make_name('eth')
service = BeaconingService(ifname, lambda _: None)
protocol = service.createProcessProtocol()
reactor.spawnProcess(protocol, b"sh", (b"sh", b"-c", b"exec cat >&2"))
protocol.transport.write(
b"Lines written to stderr are logged\n"
b"with a prefix, with no exceptions.\n")
protocol.transport.closeStdin()
yield protocol.done
self.assertThat(logger.output, Equals(
"observe-beacons[%s]: Lines written to stderr are logged\n"
"---\n"
"observe-beacons[%s]: with a prefix, with no exceptions."
% (ifname, ifname)))
def setUp(self):
# A memory-only terminal emulator, into which the server will
# write things and make other state changes. What ends up
# here is basically what a user would have seen on their
# screen.
testTerminal = NotifyingExpectableBuffer()
# An insults client protocol which will translate bytes
# received from the child process into keystroke commands for
# an ITerminalProtocol.
insultsClient = insults.ClientProtocol(lambda: testTerminal)
# A process protocol which will translate stdout and stderr
# received from the child process to dataReceived calls and
# error reporting on an insults client protocol.
processClient = stdio.TerminalProcessProtocol(insultsClient)
# Run twisted/conch/stdio.py with the name of a class
# implementing ITerminalProtocol. This class will be used to
# handle bytes we send to the child process.
exe = sys.executable
module = stdio.__file__
args = ["python2.3", module, reflect.qual(self.serverProtocol)]
env = {"PYTHONPATH": os.pathsep.join(sys.path)}
from twisted.internet import reactor
clientTransport = reactor.spawnProcess(processClient, exe, args,
env=env, usePTY=True)
self.recvlineClient = self.testTerminal = testTerminal
self.processClient = processClient
self.clientTransport = clientTransport
# Wait for the process protocol and test terminal to become
# connected before proceeding. The former should always
# happen first, but it doesn't hurt to be safe.
return defer.gatherResults(filter(None, [
processClient.onConnection,
testTerminal.expect(">>> ")]))
def serviceStarted(self):
if self.spawn:
env = os.environ.copy()
env['PYTHONPATH'] = os.pathsep.join(sys.path)
reactor.callLater(0,reactor.spawnProcess, env=env, *self.spawn)
self.connected = 1