def test_stdin(self):
"""
Making sure getPassword accepts a password from standard input by
running a child process which uses getPassword to read in a string
which it then writes it out again. Write a string to the child
process and then read one and make sure it is the right string.
"""
p = PasswordTestingProcessProtocol()
p.finished = Deferred()
reactor.spawnProcess(
p,
sys.executable,
[sys.executable,
'-c',
('import sys\n'
'from twisted.python.util import getPassword\n'
'sys.stdout.write(getPassword())\n'
'sys.stdout.flush()\n')],
env={'PYTHONPATH': os.pathsep.join(sys.path)})
def processFinished((reason, output)):
reason.trap(ProcessDone)
self.assertEquals(output, [(1, 'secret')])
python类ProcessDone()的实例源码
def testProducer(self):
p = StandardIOTestProcessProtocol()
d = p.onCompletion
written = []
toWrite = range(100)
def connectionMade(ign):
if toWrite:
written.append(str(toWrite.pop()) + "\n")
proc.write(written[-1])
reactor.callLater(0.01, connectionMade, None)
proc = self._spawnProcess(p, 'stdio_test_producer.py')
p.onConnection.addCallback(connectionMade)
def processEnded(reason):
self.assertEquals(p.data, {1: ''.join(written)})
self.failIf(toWrite, "Connection lost with %d writes left to go." % (len(toWrite),))
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)
def processExited(self, failure):
err = failure.trap(
internet_error.ProcessDone, internet_error.ProcessTerminated)
if err == internet_error.ProcessDone:
pass
elif err == internet_error.ProcessTerminated:
self.failed = True
self.errmsg = failure.value.exitCode
if self.errmsg:
self.log.debug('Process Exited, status %d' % (self.errmsg,))
else:
self.log.warn('%r' % failure.value)
if IS_MAC:
# TODO: need to exit properly!
self.errmsg = None
self.proto = None
self._turn_state_off()
def test_restart_stops_exchanger(self):
"""
After a successful shutdown, the broker stops processing new messages.
"""
message = {"type": "shutdown", "reboot": False, "operation-id": 100}
self.plugin.perform_shutdown(message)
[arguments] = self.process_factory.spawns
protocol = arguments[0]
protocol.processEnded(Failure(ProcessDone(status=0)))
self.broker_service.reactor.advance(100)
self.manager.reactor.advance(100)
# New messages will not be exchanged after a reboot process is in
# process.
self.manager.broker.exchanger.schedule_exchange()
payloads = self.manager.broker.exchanger._transport.payloads
self.assertEqual(0, len(payloads))
return protocol.result
def test_limit_size(self):
"""Data returned from the command is limited."""
factory = StubProcessFactory()
self.plugin.process_factory = factory
self.plugin.size_limit = 100
result = self.plugin.run_script("/bin/sh", "")
# Ultimately we assert that the resulting output is limited to
# 100 bytes and indicates its truncation.
result.addCallback(self.assertEqual,
("x" * 79) + "\n**OUTPUT TRUNCATED**")
protocol = factory.spawns[0][0]
# Push 200 bytes of output, so we trigger truncation.
protocol.childDataReceived(1, b"x" * 200)
for fd in (0, 1, 2):
protocol.childConnectionLost(fd)
protocol.processEnded(Failure(ProcessDone(0)))
return result
def test_command_output_ends_with_truncation(self):
"""After truncation, no further output is recorded."""
factory = StubProcessFactory()
self.plugin.process_factory = factory
self.plugin.size_limit = 100
result = self.plugin.run_script("/bin/sh", "")
# Ultimately we assert that the resulting output is limited to
# 100 bytes and indicates its truncation.
result.addCallback(self.assertEqual,
("x" * 79) + "\n**OUTPUT TRUNCATED**")
protocol = factory.spawns[0][0]
# Push 200 bytes of output, so we trigger truncation.
protocol.childDataReceived(1, b"x" * 200)
# Push 200 bytes more
protocol.childDataReceived(1, b"x" * 200)
for fd in (0, 1, 2):
protocol.childConnectionLost(fd)
protocol.processEnded(Failure(ProcessDone(0)))
return result
def test_cancel_doesnt_blow_after_success(self):
"""
When the process ends successfully and is immediately followed by the
timeout, the output should still be in the failure and nothing bad will
happen!
[regression test: killing of the already-dead process would blow up.]
"""
factory = StubProcessFactory()
self.plugin.process_factory = factory
result = self.plugin.run_script("/bin/sh", "", time_limit=500)
protocol = factory.spawns[0][0]
protocol.makeConnection(DummyProcess())
protocol.childDataReceived(1, b"hi")
protocol.processEnded(Failure(ProcessDone(0)))
self.manager.reactor.advance(501)
def got_result(output):
self.assertEqual(output, "hi")
result.addCallback(got_result)
return result
def test_user(self):
"""A user can be specified in the message."""
username = pwd.getpwuid(os.getuid())[0]
uid, gid, home = get_user_info(username)
def spawnProcess(protocol, filename, args, env, path, uid, gid):
protocol.childDataReceived(1, "hi!\n")
protocol.processEnded(Failure(ProcessDone(0)))
self._verify_script(filename, sys.executable, "print 'hi'")
process_factory = mock.Mock()
process_factory.spawnProcess = mock.Mock(side_effect=spawnProcess)
self.manager.add(
ScriptExecutionPlugin(process_factory=process_factory))
result = self._send_script(sys.executable, "print 'hi'", user=username)
def check(_):
process_factory.spawnProcess.assert_called_with(
mock.ANY, mock.ANY, args=mock.ANY, uid=None, gid=None,
path=mock.ANY, env=get_default_environment())
return result.addCallback(check)
def test_stdin(self):
"""
Making sure getPassword accepts a password from standard input by
running a child process which uses getPassword to read in a string
which it then writes it out again. Write a string to the child
process and then read one and make sure it is the right string.
"""
p = PasswordTestingProcessProtocol()
p.finished = Deferred()
reactor.spawnProcess(
p,
sys.executable,
[sys.executable,
'-c',
('import sys\n'
'from twisted.python.util import getPassword\n'
'sys.stdout.write(getPassword())\n'
'sys.stdout.flush()\n')],
env={'PYTHONPATH': os.pathsep.join(sys.path)})
def processFinished((reason, output)):
reason.trap(ProcessDone)
self.assertEquals(output, [(1, 'secret')])
def testProducer(self):
p = StandardIOTestProcessProtocol()
d = p.onCompletion
written = []
toWrite = range(100)
def connectionMade(ign):
if toWrite:
written.append(str(toWrite.pop()) + "\n")
proc.write(written[-1])
reactor.callLater(0.01, connectionMade, None)
proc = self._spawnProcess(p, 'stdio_test_producer.py')
p.onConnection.addCallback(connectionMade)
def processEnded(reason):
self.assertEquals(p.data, {1: ''.join(written)})
self.failIf(toWrite, "Connection lost with %d writes left to go." % (len(toWrite),))
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)
def test_stdin(self):
"""
Making sure getPassword accepts a password from standard input by
running a child process which uses getPassword to read in a string
which it then writes it out again. Write a string to the child
process and then read one and make sure it is the right string.
"""
p = PasswordTestingProcessProtocol()
p.finished = Deferred()
reactor.spawnProcess(
p, pyExe,
[pyExe,
b'-c',
(b'import sys\n'
b'from twisted.python.util import getPassword\n'
b'sys.stdout.write(getPassword())\n'
b'sys.stdout.flush()\n')],
env={b'PYTHONPATH': os.pathsep.join(sys.path).encode("utf8")})
def processFinished(result):
(reason, output) = result
reason.trap(ProcessDone)
self.assertIn((1, b'secret'), output)
return p.finished.addCallback(processFinished)
def test_loseConnection(self):
"""
Verify that a protocol connected to L{StandardIO} can disconnect
itself using C{transport.loseConnection}.
"""
errorLogFile = self.mktemp()
log.msg("Child process logging to " + errorLogFile)
p = StandardIOTestProcessProtocol()
d = p.onCompletion
self._spawnProcess(p, b'stdio_test_loseconn', errorLogFile)
def processEnded(reason):
# Copy the child's log to ours so it's more visible.
with open(errorLogFile, 'r') as f:
for line in f:
log.msg("Child logged: " + line.rstrip())
self.failIfIn(1, p.data)
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)
def test_readConnectionLost(self):
"""
When stdin is closed and the protocol connected to it implements
L{IHalfCloseableProtocol}, the protocol's C{readConnectionLost} method
is called.
"""
errorLogFile = self.mktemp()
log.msg("Child process logging to " + errorLogFile)
p = StandardIOTestProcessProtocol()
p.onDataReceived = defer.Deferred()
def cbBytes(ignored):
d = p.onCompletion
p.transport.closeStdin()
return d
p.onDataReceived.addCallback(cbBytes)
def processEnded(reason):
reason.trap(error.ProcessDone)
d = self._requireFailure(p.onDataReceived, processEnded)
self._spawnProcess(
p, b'stdio_test_halfclose', errorLogFile)
return d
def test_consumer(self):
"""
Verify that the transport of a protocol connected to L{StandardIO}
is a working L{IConsumer} provider.
"""
p = StandardIOTestProcessProtocol()
d = p.onCompletion
junkPath = self._junkPath()
self._spawnProcess(p, b'stdio_test_consumer', junkPath)
def processEnded(reason):
with open(junkPath, 'rb') as f:
self.assertEqual(p.data[1], f.read())
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)
def connectionLost(self, reason=None):
"""Shut down resources."""
# Get the exit status and notify the protocol
exitCode = win32process.GetExitCodeProcess(self.hProcess)
if exitCode == 0:
err = error.ProcessDone(exitCode)
else:
err = error.ProcessTerminated(exitCode)
self.protocol.processEnded(failure.Failure(err))
## IConsumer
def checkWork(self):
if win32event.WaitForSingleObject(self.proc.hProcess, 0) != win32event.WAIT_OBJECT_0:
return 0
exitCode = win32process.GetExitCodeProcess(self.proc.hProcess)
if exitCode == 0:
err = error.ProcessDone(exitCode)
else:
err = error.ProcessTerminated(exitCode)
self.deactivate()
self.proc.protocol.processEnded(failure.Failure(err))
return 0
def testLoseConnection(self):
p = StandardIOTestProcessProtocol()
d = p.onCompletion
self._spawnProcess(p, 'stdio_test_loseconn.py')
def processEnded(reason):
self.assertEquals(p.data, {})
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)
def testHostAndPeer(self):
p = StandardIOTestProcessProtocol()
d = p.onCompletion
self._spawnProcess(p, 'stdio_test_hostpeer.py')
def processEnded(reason):
hostpeer = p.data.pop(1)
self.assertEquals(p.data, {})
host, peer = hostpeer.splitlines()
self.failUnless(host)
self.failUnless(peer)
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)
def testWriteSequence(self):
p = StandardIOTestProcessProtocol()
d = p.onCompletion
self._spawnProcess(p, 'stdio_test_writeseq.py')
def processEnded(reason):
self.assertEquals(p.data, {1: 'ok!'})
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)
def testConsumer(self):
p = StandardIOTestProcessProtocol()
d = p.onCompletion
junkPath = self._junkPath()
self._spawnProcess(p, 'stdio_test_consumer.py', junkPath)
def processEnded(reason):
self.assertEquals(p.data, {1: file(junkPath).read()})
reason.trap(error.ProcessDone)
return self._requireFailure(d, processEnded)
def processEnded(self, reason):
self.finished = 1
if not reason.check(error.ProcessDone):
self.failure = "process didn't terminate normally: " + str(reason)
self.onEnded.callback(self)
def processEnded(self, reason):
self.reason = reason
if reason.check(error.ProcessDone):
self.deferred.callback(None)
else:
self.deferred.errback(reason)
def _endProcess(self, reason, p):
self.failIf(reason.check(error.ProcessDone),
'Child should fail due to EPIPE.')
reason.trap(error.ProcessTerminated)
# child must not get past that write without raising
self.failIfEqual(reason.value.exitCode, 42,
'process reason was %r' % reason)
self.failUnlessEqual(p.output, '')
return p.errput
def processEnded(self, reason):
if reason.check(error.ProcessDone):
self.onEnd.callback("Complete")
else:
self.onEnd.errback(reason)
def test_change_packages_with_reboot_flag(self):
"""
When a C{reboot-if-necessary} flag is passed in the C{change-packages},
A C{ShutdownProtocolProcess} is created and the package result change
is returned.
"""
self.store.add_task("changer",
{"type": "change-packages", "install": [2],
"binaries": [(HASH2, 2, PKGDEB2)],
"operation-id": 123,
"reboot-if-necessary": True})
def return_good_result(self):
return "Yeah, I did whatever you've asked for!"
self.replace_perform_changes(return_good_result)
result = self.changer.handle_tasks()
def got_result(result):
self.assertIn("Landscape is rebooting the system",
self.logfile.getvalue())
self.assertMessages(self.get_pending_messages(),
[{"operation-id": 123,
"result-code": 1,
"result-text": "Yeah, I did whatever you've "
"asked for!",
"type": "change-packages-result"}])
self.landscape_reactor.advance(5)
[arguments] = self.process_factory.spawns
protocol = arguments[0]
protocol.processEnded(Failure(ProcessDone(status=0)))
self.broker_service.reactor.advance(100)
self.landscape_reactor.advance(10)
return result.addCallback(got_result)
def test_no_exchange_after_reboot(self):
"""
After initiating a reboot process, no more messages are exchanged.
"""
self.store.add_task("changer",
{"type": "change-packages", "install": [2],
"binaries": [(HASH2, 2, PKGDEB2)],
"operation-id": 123,
"reboot-if-necessary": True})
def return_good_result(self):
return "Yeah, I did whatever you've asked for!"
self.replace_perform_changes(return_good_result)
result = self.changer.handle_tasks()
def got_result(result):
# Advance both reactors so the pending messages are exchanged.
self.broker_service.reactor.advance(100)
self.landscape_reactor.advance(10)
payloads = self.broker_service.exchanger._transport.payloads
self.assertEqual(0, len(payloads))
self.landscape_reactor.advance(5)
[arguments] = self.process_factory.spawns
protocol = arguments[0]
protocol.processEnded(Failure(ProcessDone(status=0)))
self.broker_service.reactor.advance(100)
self.landscape_reactor.advance(10)
return result.addCallback(got_result)
def processEnded(self, reason):
"""Fire back the C{result} L{Deferred}.
C{result}'s callback will be fired with the string of data received
from the subprocess, or if the subprocess failed C{result}'s errback
will be fired with the string of data received from the subprocess.
"""
if self._waiting:
if reason.check(ProcessDone):
self._succeed()
else:
self.result.errback(ShutdownFailedError(self.get_data()))
self._waiting = False
def processEnded(self, reason):
"""Fire back the deferred.
The deferred will be fired with the string of data received from the
subprocess, or if the subprocess was cancelled, a
L{ProcessTimeLimitReachedError} will be fired with data accumulated so
far.
"""
exit_code = reason.value.exitCode
# We get bytes with self.data, but want unicode with replace
# characters. This is again attempted in
# ScriptExecutionPlugin._respond, but it is not called in all cases.
data = b"".join(self.data).decode("utf-8", "replace")
if self._cancelled:
self.result_deferred.errback(ProcessTimeLimitReachedError(data))
else:
if self._scheduled_cancel is not None:
scheduled = self._scheduled_cancel
self._scheduled_cancel = None
self.reactor.cancel_call(scheduled)
if reason.check(ProcessDone):
self.result_deferred.callback(data)
else:
self.result_deferred.errback(
ProcessFailedError(data, exit_code))
def _exit_process_protocol(self, protocol, stdout):
protocol.childDataReceived(1, stdout)
for fd in (0, 1, 2):
protocol.childConnectionLost(fd)
protocol.processEnded(Failure(ProcessDone(0)))
def _run_script(self, username, uid, gid, path):
expected_uid = uid if uid != os.getuid() else None
expected_gid = gid if gid != os.getgid() else None
factory = StubProcessFactory()
self.plugin.process_factory = factory
# ignore the call to chown!
patch_chown = mock.patch("os.chown")
mock_chown = patch_chown.start()
result = self.plugin.run_script("/bin/sh", "echo hi", user=username)
self.assertEqual(len(factory.spawns), 1)
spawn = factory.spawns[0]
self.assertEqual(spawn[4], path)
self.assertEqual(spawn[5], expected_uid)
self.assertEqual(spawn[6], expected_gid)
protocol = spawn[0]
protocol.childDataReceived(1, b"foobar")
for fd in (0, 1, 2):
protocol.childConnectionLost(fd)
protocol.processEnded(Failure(ProcessDone(0)))
def check(result):
mock_chown.assert_called_with()
self.assertEqual(result, "foobar")
def cleanup(result):
patch_chown.stop()
return result
return result.addErrback(check).addBoth(cleanup)