def test_requestExec(self):
"""
When a client requests a command, the SSHSession object should get
the command by getting an ISession adapter for the avatar, then
calling execCommand with a ProcessProtocol to attach and the
command line.
"""
ret = self.session.requestReceived(b'exec',
common.NS(b'failure'))
self.assertFalse(ret)
self.assertRequestRaisedRuntimeError()
self.assertIsNone(self.session.client)
self.assertTrue(self.session.requestReceived(b'exec',
common.NS(b'success')))
self.assertSessionIsStubSession()
self.assertIsInstance(self.session.client,
session.SSHSessionProcessProtocol)
self.assertIs(self.session.session.execProtocol, self.session.client)
self.assertEqual(self.session.session.execCommandLine,
b'success')
python类ProcessProtocol()的实例源码
def test_stderr(self):
"""
Bytes written to stderr by the spawned process are passed to the
C{errReceived} callback on the C{ProcessProtocol} passed to
C{spawnProcess}.
"""
value = "42"
p = Accumulator()
d = p.endedDeferred = defer.Deferred()
reactor.spawnProcess(p, pyExe,
[pyExe, b"-c",
networkString("import sys; sys.stderr.write"
"('{0}')".format(value))],
env=None, path="/tmp",
usePTY=self.usePTY)
def processEnded(ign):
self.assertEqual(b"42", p.errF.getvalue())
return d.addCallback(processEnded)
def test_stdout(self):
"""
ProcessProtocol.transport.closeStdout actually closes the pipe.
"""
d = self.doit(1)
def _check(errput):
if _PY3:
if runtime.platform.isWindows():
self.assertIn(b"OSError", errput)
self.assertIn(b"22", errput)
else:
self.assertIn(b'BrokenPipeError', errput)
else:
self.assertIn(b'OSError', errput)
if runtime.platform.getType() != 'win32':
self.assertIn(b'Broken pipe', errput)
d.addCallback(_check)
return d
def test_launchWorkerProcesses(self):
"""
Given a C{spawnProcess} function, C{launchWorkerProcess} launches a
python process with an existing path as its argument.
"""
protocols = [ProcessProtocol() for i in range(4)]
arguments = []
environment = {}
def fakeSpawnProcess(processProtocol, executable, args=(), env={},
path=None, uid=None, gid=None, usePTY=0,
childFDs=None):
arguments.append(executable)
arguments.extend(args)
environment.update(env)
self.runner.launchWorkerProcesses(
fakeSpawnProcess, protocols, ["foo"])
self.assertEqual(arguments[0], arguments[1])
self.assertTrue(os.path.exists(arguments[2]))
self.assertEqual("foo", arguments[3])
self.assertEqual(os.pathsep.join(sys.path),
environment["TRIAL_PYTHONPATH"])
def fork(executable, args=(), env={}, path=None, timeout=3600):
"""fork
Provides a deferred wrapper function with a timeout function
:param executable: Executable
:type executable: str.
:param args: Tupple of arguments
:type args: tupple.
:param env: Environment dictionary
:type env: dict.
:param timeout: Kill the child process if timeout is exceeded
:type timeout: int.
"""
de = defer.Deferred()
proc = ProcessProtocol(de, timeout)
reactor.spawnProcess(proc, executable, (executable,)+tuple(args), env,
path)
return de
def test_stdout(self):
"""ProcessProtocol.transport.closeStdout actually closes the pipe."""
d = self.doit(1)
def _check(errput):
self.failIfEqual(errput.find('OSError'), -1)
if runtime.platform.getType() != 'win32':
self.failIfEqual(errput.find('Broken pipe'), -1)
d.addCallback(_check)
return d
def test_stderr(self):
"""ProcessProtocol.transport.closeStderr actually closes the pipe."""
d = self.doit(2)
def _check(errput):
# there should be no stderr open, so nothing for it to
# write the error to.
self.failUnlessEqual(errput, '')
d.addCallback(_check)
return d
def test_stdout(self):
"""ProcessProtocol.transport.closeStdout actually closes the pipe."""
d = self.doit(1)
def _check(errput):
self.failIfEqual(errput.find('OSError'), -1)
if runtime.platform.getType() != 'win32':
self.failIfEqual(errput.find('Broken pipe'), -1)
d.addCallback(_check)
return d
def test_stderr(self):
"""ProcessProtocol.transport.closeStderr actually closes the pipe."""
d = self.doit(2)
def _check(errput):
# there should be no stderr open, so nothing for it to
# write the error to.
self.failUnlessEqual(errput, '')
d.addCallback(_check)
return d
def childDataReceived(self, childFD, data):
#self.sptl.monitor
try:
for pid, sid in self.sptl.monitor_ps.iteritems():
print pid, sid
if pid == self.pid:
self.sptl.publish("psmonitor", data, eligible=[sid])
except Exception, e:
print e
protocol.ProcessProtocol.childDataReceived(self, childFD, data)
def connectionMade(self):
"""ProcessProtocol override"""
if not self.launched.called:
self.launched.callback(self)
def outReceived(self, data):
"""ProcessProtocol override"""
self._out += data.decode('utf8')
while '\n' in self._out:
idx = self._out.find('\n')
line = self._out[:idx]
self._out = self._out[idx + 1:]
sys.stdout.write(self.prefix + self.color + line + Fore.RESET + '\n')
def errReceived(self, data):
"""ProcessProtocol override"""
self._err += data.decode('utf8')
while '\n' in self._err:
idx = self._err.find('\n')
line = self._err[:idx]
self._err = self._err[idx + 1:]
sys.stderr.write(self.prefix + self.color + line + Fore.RESET + '\n')
def __init__(self, protocol, data):
"""
@type protocol: L{ConchTestForwardingProcess}
@param protocol: The L{ProcessProtocol} which made this connection.
@type data: str
@param data: The data to be sent to the third-party server.
"""
self.protocol = protocol
self.data = data
def __init__(self, processProtocol):
"""
Initialize our instance variables.
@param processProtocol: a C{ProcessProtocol} to connect to ourself.
"""
self.proto = processProtocol
self.closed = False
self.data = b''
processProtocol.makeConnection(self)
def write(self, data):
"""
We got some data. Give it back to our C{ProcessProtocol} with
a newline attached. Disconnect if there's a null byte.
"""
self.data += data
self.proto.outReceived(data)
self.proto.outReceived(b'\r\n')
if b'\x00' in data: # mimic 'exit' for the shell test
self.loseConnection()
def loseConnection(self):
"""
If we're asked to disconnect (and we haven't already) shut down
the C{ProcessProtocol} with a 0 exit code.
"""
if self.closed:
return
self.closed = 1
self.proto.inConnectionLost()
self.proto.outConnectionLost()
self.proto.errConnectionLost()
self.proto.processEnded(failure.Failure(
error.ProcessTerminated(0, None, None)))
def test_lookupSubsystem(self):
"""
When a client requests a subsystem, the SSHSession object should get
the subsystem by calling avatar.lookupSubsystem, and attach it as
the client.
"""
ret = self.session.requestReceived(
b'subsystem', common.NS(b'TestSubsystem') + b'data')
self.assertTrue(ret)
self.assertIsInstance(self.session.client, protocol.ProcessProtocol)
self.assertIs(self.session.client.transport.proto,
self.session.avatar.subsystem)
def test_noCompatibilityLayer(self):
"""
If no compatibility layer is present, imports of gobject and friends
are disallowed.
We do this by running a process where we make sure gi.pygtkcompat
isn't present.
"""
if _PY3:
raise SkipTest("Python3 always has the compatibility layer.")
from twisted.internet import reactor
if not IReactorProcess.providedBy(reactor):
raise SkipTest("No process support available in this reactor.")
result = Deferred()
class Stdout(ProcessProtocol):
data = b""
def errReceived(self, err):
print(err)
def outReceived(self, data):
self.data += data
def processExited(self, reason):
result.callback(self.data)
path = FilePath(__file__).sibling(b"process_gireactornocompat.py").path
pyExe = FilePath(sys.executable)._asBytesPath()
# Pass in a PYTHONPATH that is the test runner's os.path, to make sure
# we're running from a checkout
reactor.spawnProcess(Stdout(), pyExe, [pyExe, path],
env={"PYTHONPATH": ":".join(sys.path)})
result.addCallback(self.assertEqual, b"success")
return result
def _spawnProcess(self, proto, sibling, *args, **kw):
"""
Launch a child Python process and communicate with it using the
given ProcessProtocol.
@param proto: A L{ProcessProtocol} instance which will be connected
to the child process.
@param sibling: The basename of a file containing the Python program
to run in the child process.
@param *args: strings which will be passed to the child process on
the command line as C{argv[2:]}.
@param **kw: additional arguments to pass to L{reactor.spawnProcess}.
@return: The L{IProcessTransport} provider for the spawned process.
"""
args = [sys.executable,
b"-m", b"twisted.test." + sibling,
reactor.__class__.__module__] + list(args)
return reactor.spawnProcess(
proto,
sys.executable,
args,
env=properEnv,
**kw)
def test_interface(self):
"""
L{ProcessProtocol} implements L{IProcessProtocol}.
"""
verifyObject(interfaces.IProcessProtocol, protocol.ProcessProtocol())
def test_outReceived(self):
"""
Verify that when stdout is delivered to
L{ProcessProtocol.childDataReceived}, it is forwarded to
L{ProcessProtocol.outReceived}.
"""
received = []
class OutProtocol(StubProcessProtocol):
def outReceived(self, data):
received.append(data)
bytesToSend = b"bytes"
p = OutProtocol()
p.childDataReceived(1, bytesToSend)
self.assertEqual(received, [bytesToSend])
def makeDeferredWithProcessProtocol():
"""Returns a (`Deferred`, `ProcessProtocol`) tuple.
The Deferred's `callback()` will be called (with None) if the
`ProcessProtocol` is called back indicating that no error occurred.
Its `errback()` will be called with the `Failure` reason otherwise.
"""
done = Deferred()
protocol = ProcessProtocol()
# Call the errback if the "failure" object indicates a non-zero exit.
protocol.processEnded = lambda reason: (
done.errback(reason) if (reason and not reason.check(ProcessDone))
else done.callback(None))
return done, protocol
def test_wrongArguments(self):
"""
Test invalid arguments to spawnProcess: arguments and environment
must only contains string or unicode, and not null bytes.
"""
exe = sys.executable
p = protocol.ProcessProtocol()
badEnvs = [
{"foo": 2},
{"foo": "egg\0a"},
{3: "bar"},
{"bar\0foo": "bar"}]
badArgs = [
[exe, 2],
"spam",
[exe, "foo\0bar"]]
# Sanity check - this will fail for people who have mucked with
# their site configuration in a stupid way, but there's nothing we
# can do about that.
badUnicode = u'\N{SNOWMAN}'
try:
badUnicode.encode(sys.getdefaultencoding())
except UnicodeEncodeError:
# Okay, that unicode doesn't encode, put it in as a bad environment
# key.
badEnvs.append({badUnicode: 'value for bad unicode key'})
badEnvs.append({'key for bad unicode value': badUnicode})
badArgs.append([exe, badUnicode])
else:
# It _did_ encode. Most likely, Gtk2 is being used and the
# default system encoding is UTF-8, which can encode anything.
# In any case, if implicit unicode -> str conversion works for
# that string, we can't test that TypeError gets raised instead,
# so just leave it off.
pass
for env in badEnvs:
self.assertRaises(
TypeError,
reactor.spawnProcess, p, exe, [exe, "-c", ""], env=env)
for args in badArgs:
self.assertRaises(
TypeError,
reactor.spawnProcess, p, exe, args, env=None)
# Use upper-case so that the environment key test uses an upper case
# name: some versions of Windows only support upper case environment
# variable names, and I think Python (as of 2.5) doesn't use the right
# syscall for lowercase or mixed case names to work anyway.
def test_wrongArguments(self):
"""
Test invalid arguments to spawnProcess: arguments and environment
must only contains string or unicode, and not null bytes.
"""
exe = sys.executable
p = protocol.ProcessProtocol()
badEnvs = [
{"foo": 2},
{"foo": "egg\0a"},
{3: "bar"},
{"bar\0foo": "bar"}]
badArgs = [
[exe, 2],
"spam",
[exe, "foo\0bar"]]
# Sanity check - this will fail for people who have mucked with
# their site configuration in a stupid way, but there's nothing we
# can do about that.
badUnicode = u'\N{SNOWMAN}'
try:
badUnicode.encode(sys.getdefaultencoding())
except UnicodeEncodeError:
# Okay, that unicode doesn't encode, put it in as a bad environment
# key.
badEnvs.append({badUnicode: 'value for bad unicode key'})
badEnvs.append({'key for bad unicode value': badUnicode})
badArgs.append([exe, badUnicode])
else:
# It _did_ encode. Most likely, Gtk2 is being used and the
# default system encoding is UTF-8, which can encode anything.
# In any case, if implicit unicode -> str conversion works for
# that string, we can't test that TypeError gets raised instead,
# so just leave it off.
pass
for env in badEnvs:
self.assertRaises(
TypeError,
reactor.spawnProcess, p, exe, [exe, "-c", ""], env=env)
for args in badArgs:
self.assertRaises(
TypeError,
reactor.spawnProcess, p, exe, args, env=None)
# Use upper-case so that the environment key test uses an upper case
# name: some versions of Windows only support upper case environment
# variable names, and I think Python (as of 2.5) doesn't use the right
# syscall for lowercase or mixed case names to work anyway.
def test_wrongArguments(self):
"""
Test invalid arguments to spawnProcess: arguments and environment
must only contains string or unicode, and not null bytes.
"""
p = protocol.ProcessProtocol()
badEnvs = [
{b"foo": 2},
{b"foo": b"egg\0a"},
{3: b"bar"},
{b"bar\0foo": b"bar"}]
badArgs = [
[pyExe, 2],
b"spam",
[pyExe, b"foo\0bar"]]
# Sanity check - this will fail for people who have mucked with
# their site configuration in a stupid way, but there's nothing we
# can do about that.
badUnicode = u'\N{SNOWMAN}'
try:
badUnicode.encode(sys.getfilesystemencoding())
except UnicodeEncodeError:
# Okay, that unicode doesn't encode, put it in as a bad environment
# key.
badEnvs.append({badUnicode: 'value for bad unicode key'})
badEnvs.append({'key for bad unicode value': badUnicode})
badArgs.append([pyExe, badUnicode])
else:
# It _did_ encode. Most likely, Gtk2 is being used and the
# default system encoding is UTF-8, which can encode anything.
# In any case, if implicit unicode -> str conversion works for
# that string, we can't test that TypeError gets raised instead,
# so just leave it off.
pass
for env in badEnvs:
self.assertRaises(
TypeError,
reactor.spawnProcess, p, pyExe, [pyExe, b"-c", b""], env=env)
for args in badArgs:
self.assertRaises(
TypeError,
reactor.spawnProcess, p, pyExe, args, env=None)
def test_errorInProcessEnded(self):
"""
The handler which reaps a process is removed when the process is
reaped, even if the protocol's C{processEnded} method raises an
exception.
"""
connected = defer.Deferred()
ended = defer.Deferred()
# This script runs until we disconnect its transport.
scriptPath = b"twisted.test.process_echoer"
class ErrorInProcessEnded(protocol.ProcessProtocol):
"""
A protocol that raises an error in C{processEnded}.
"""
def makeConnection(self, transport):
connected.callback(transport)
def processEnded(self, reason):
reactor.callLater(0, ended.callback, None)
raise RuntimeError("Deliberate error")
# Launch the process.
reactor.spawnProcess(
ErrorInProcessEnded(), pyExe,
[pyExe, b"-u", b"-m", scriptPath],
env=properEnv, path=None)
pid = []
def cbConnected(transport):
pid.append(transport.pid)
# There's now a reap process handler registered.
self.assertIn(transport.pid, process.reapProcessHandlers)
# Kill the process cleanly, triggering an error in the protocol.
transport.loseConnection()
connected.addCallback(cbConnected)
def checkTerminated(ignored):
# The exception was logged.
excs = self.flushLoggedErrors(RuntimeError)
self.assertEqual(len(excs), 1)
# The process is no longer scheduled for reaping.
self.assertNotIn(pid[0], process.reapProcessHandlers)
ended.addCallback(checkTerminated)
return ended
def test_closeHandles(self):
"""
The win32 handles should be properly closed when the process exits.
"""
import win32api
connected = defer.Deferred()
ended = defer.Deferred()
class SimpleProtocol(protocol.ProcessProtocol):
"""
A protocol that fires deferreds when connected and disconnected.
"""
def makeConnection(self, transport):
connected.callback(transport)
def processEnded(self, reason):
ended.callback(None)
p = SimpleProtocol()
pyArgs = [pyExe, b"-u", b"-c", b"print('hello')"]
proc = reactor.spawnProcess(p, pyExe, pyArgs)
def cbConnected(transport):
self.assertIs(transport, proc)
# perform a basic validity test on the handles
win32api.GetHandleInformation(proc.hProcess)
win32api.GetHandleInformation(proc.hThread)
# And save their values for later
self.hProcess = proc.hProcess
self.hThread = proc.hThread
connected.addCallback(cbConnected)
def checkTerminated(ignored):
# The attributes on the process object must be reset...
self.assertIsNone(proc.pid)
self.assertIsNone(proc.hProcess)
self.assertIsNone(proc.hThread)
# ...and the handles must be closed.
self.assertRaises(win32api.error,
win32api.GetHandleInformation, self.hProcess)
self.assertRaises(win32api.error,
win32api.GetHandleInformation, self.hThread)
ended.addCallback(checkTerminated)
return defer.gatherResults([connected, ended])