def execute(self, args, p, preargs=''):
if runtime.platformType == 'win32':
raise unittest.SkipTest, "can't run cmdline client on win32"
port = self.server.getHost().port
cmd = ('-p %i -l testuser '
'--known-hosts kh_test '
'--user-authentications publickey '
'--host-key-algorithms ssh-rsa '
'-a -I '
'-K direct '
'-i dsa_test '
'-v ') % port + preargs + \
' 127.0.0.1 ' + args
cmds = _makeArgs(cmd.split())
log.msg(str(cmds))
env = os.environ.copy()
env['PYTHONPATH'] = os.pathsep.join(sys.path)
reactor.spawnProcess(p, sys.executable, cmds, env=env)
return p.deferred
python类spawnProcess()的实例源码
def _spawnProcess(self, proto, sibling, *args):
import twisted
subenv = dict(os.environ)
subenv['PYTHONPATH'] = os.pathsep.join(
[os.path.abspath(
os.path.dirname(os.path.dirname(twisted.__file__))),
subenv.get('PYTHONPATH', '')
])
return reactor.spawnProcess(
proto,
sys.executable,
[sys.executable,
filepath.FilePath(__file__).sibling(sibling).path,
reactor.__class__.__module__] + list(args),
env=subenv,
)
def testStdio(self):
"""twisted.internet.stdio test."""
exe = sys.executable
scriptPath = util.sibpath(__file__, "process_twisted.py")
p = Accumulator()
d = p.endedDeferred = defer.Deferred()
env = {"PYTHONPATH": os.pathsep.join(sys.path)}
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=env,
path=None, usePTY=self.usePTY)
p.transport.write("hello, world")
p.transport.write("abc")
p.transport.write("123")
p.transport.closeStdin()
def processEnded(ign):
self.assertEquals(p.outF.getvalue(), "hello, worldabc123",
"Output follows:\n"
"%s\n"
"Error message from process_twisted follows:\n"
"%s\n" % (p.outF.getvalue(), p.errF.getvalue()))
return d.addCallback(processEnded)
def testProcess(self):
exe = sys.executable
scriptPath = util.sibpath(__file__, "process_tester.py")
d = defer.Deferred()
p = TestProcessProtocol()
p.deferred = d
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None)
def check(ignored):
self.assertEquals(p.stages, [1, 2, 3, 4, 5])
f = p.reason
f.trap(error.ProcessTerminated)
self.assertEquals(f.value.exitCode, 23)
# would .signal be available on non-posix?
# self.assertEquals(f.value.signal, None)
try:
import process_tester, glob
for f in glob.glob(process_tester.test_file_match):
os.remove(f)
except:
pass
d.addCallback(check)
return d
def testManyProcesses(self):
def _check(results, protocols):
for p in protocols:
self.assertEquals(p.stages, [1, 2, 3, 4, 5], "[%d] stages = %s" % (id(p.transport), str(p.stages)))
# test status code
f = p.reason
f.trap(error.ProcessTerminated)
self.assertEquals(f.value.exitCode, 23)
exe = sys.executable
scriptPath = util.sibpath(__file__, "process_tester.py")
args = [exe, "-u", scriptPath]
protocols = []
deferreds = []
for i in xrange(50):
p = TestManyProcessProtocol()
protocols.append(p)
reactor.spawnProcess(p, exe, args, env=None)
deferreds.append(p.deferred)
deferredList = defer.DeferredList(deferreds, consumeErrors=True)
deferredList.addCallback(_check, protocols)
return deferredList
def testAbnormalTermination(self):
if os.path.exists('/bin/false'): cmd = '/bin/false'
elif os.path.exists('/usr/bin/false'): cmd = '/usr/bin/false'
else: raise RuntimeError("false not found in /bin or /usr/bin")
d = defer.Deferred()
p = TrivialProcessProtocol(d)
reactor.spawnProcess(p, cmd, ['false'], env=None,
usePTY=self.usePTY)
def check(ignored):
p.reason.trap(error.ProcessTerminated)
self.assertEquals(p.reason.value.exitCode, 1)
self.assertEquals(p.reason.value.signal, None)
d.addCallback(check)
return d
def testStderr(self):
# we assume there is no file named ZZXXX..., both in . and in /tmp
if not os.path.exists('/bin/ls'):
raise RuntimeError("/bin/ls not found")
p = Accumulator()
d = p.endedDeferred = defer.Deferred()
reactor.spawnProcess(p, '/bin/ls',
["/bin/ls",
"ZZXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"],
env=None, path="/tmp",
usePTY=self.usePTY)
def processEnded(ign):
self.assertEquals(lsOut, p.errF.getvalue())
return d.addCallback(processEnded)
def testProcess(self):
if os.path.exists('/bin/gzip'): cmd = '/bin/gzip'
elif os.path.exists('/usr/bin/gzip'): cmd = '/usr/bin/gzip'
else: raise RuntimeError("gzip not found in /bin or /usr/bin")
s = "there's no place like home!\n" * 3
p = Accumulator()
d = p.endedDeferred = defer.Deferred()
reactor.spawnProcess(p, cmd, [cmd, "-c"], env=None, path="/tmp",
usePTY=self.usePTY)
p.transport.write(s)
p.transport.closeStdin()
def processEnded(ign):
f = p.outF
f.seek(0, 0)
gf = gzip.GzipFile(fileobj=f)
self.assertEquals(gf.read(), s)
return d.addCallback(processEnded)
def doit(self, fd):
p = ClosingPipesProcessProtocol(True)
p.deferred.addCallbacks(
callback=lambda _: self.fail("I wanted an errback."),
errback=self._endProcess, errbackArgs=(p,))
reactor.spawnProcess(p, sys.executable,
[sys.executable, '-u', '-c',
r'raw_input(); import sys, os; os.write(%d, "foo\n"); sys.exit(42)' % fd],
env=None)
p.transport.write('go\n')
if fd == 1:
p.transport.closeStdout()
elif fd == 2:
p.transport.closeStderr()
else:
raise RuntimeError
# make the buggy case not hang
p.transport.closeStdin()
return p.deferred
def execute(self, args, p, preargs = ''):
cmdline = ('ssh -2 -l testuser -p %i '
'-oUserKnownHostsFile=kh_test '
'-oPasswordAuthentication=no '
# Always use the RSA key, since that's the one in kh_test.
'-oHostKeyAlgorithms=ssh-rsa '
'-a '
'-i dsa_test ') + preargs + \
' 127.0.0.1 ' + args
port = self.server.getHost().port
ssh_path = None
for path in ['/usr', '', '/usr/local']:
if os.path.exists(path+'/bin/ssh'):
ssh_path = path+'/bin/ssh'
break
if not ssh_path:
log.msg('skipping test, cannot find ssh')
raise unittest.SkipTest, 'skipping test, cannot find ssh'
cmds = (cmdline % port).split()
reactor.spawnProcess(p, ssh_path, cmds)
return p.deferred
def execute(self, args, p, preargs=''):
if runtime.platformType == 'win32':
raise unittest.SkipTest, "can't run cmdline client on win32"
port = self.server.getHost().port
cmd = ('-p %i -l testuser '
'--known-hosts kh_test '
'--user-authentications publickey '
'--host-key-algorithms ssh-rsa '
'-a -I '
'-K direct '
'-i dsa_test '
'-v ') % port + preargs + \
' 127.0.0.1 ' + args
cmds = _makeArgs(cmd.split())
log.msg(str(cmds))
env = os.environ.copy()
env['PYTHONPATH'] = os.pathsep.join(sys.path)
reactor.spawnProcess(p, sys.executable, cmds, env=env)
return p.deferred
def execCommand(self, proto, cmd):
from twisted.internet import reactor
uid, gid = self.avatar.getUserGroupId()
homeDir = self.avatar.getHomeDir()
shell = self.avatar.getShell() or '/bin/sh'
command = (shell, '-c', cmd)
peer = self.avatar.conn.transport.transport.getPeer()
host = self.avatar.conn.transport.transport.getHost()
self.environ['SSH_CLIENT'] = '%s %s %s' % (peer.host, peer.port, host.port)
if self.ptyTuple:
self.getPtyOwnership()
self.pty = reactor.spawnProcess(proto, \
shell, command, self.environ, homeDir,
uid, gid, usePTY = self.ptyTuple or 0)
if self.ptyTuple:
self.addUTMPEntry()
if self.modes:
self.setModes()
# else:
# tty.setraw(self.pty.pipes[0].fileno(), tty.TCSANOW)
self.avatar.conn.transport.transport.setTcpNoDelay(1)
def _spawnProcess(self, proto, sibling, *args):
import twisted
subenv = dict(os.environ)
subenv['PYTHONPATH'] = os.pathsep.join(
[os.path.abspath(
os.path.dirname(os.path.dirname(twisted.__file__))),
subenv.get('PYTHONPATH', '')
])
return reactor.spawnProcess(
proto,
sys.executable,
[sys.executable,
filepath.FilePath(__file__).sibling(sibling).path,
reactor.__class__.__module__] + list(args),
env=subenv,
)
def testStdio(self):
"""twisted.internet.stdio test."""
exe = sys.executable
scriptPath = util.sibpath(__file__, "process_twisted.py")
p = Accumulator()
d = p.endedDeferred = defer.Deferred()
env = {"PYTHONPATH": os.pathsep.join(sys.path)}
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=env,
path=None, usePTY=self.usePTY)
p.transport.write("hello, world")
p.transport.write("abc")
p.transport.write("123")
p.transport.closeStdin()
def processEnded(ign):
self.assertEquals(p.outF.getvalue(), "hello, worldabc123",
"Output follows:\n"
"%s\n"
"Error message from process_twisted follows:\n"
"%s\n" % (p.outF.getvalue(), p.errF.getvalue()))
return d.addCallback(processEnded)
def testProcess(self):
exe = sys.executable
scriptPath = util.sibpath(__file__, "process_tester.py")
d = defer.Deferred()
p = TestProcessProtocol()
p.deferred = d
reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None)
def check(ignored):
self.assertEquals(p.stages, [1, 2, 3, 4, 5])
f = p.reason
f.trap(error.ProcessTerminated)
self.assertEquals(f.value.exitCode, 23)
# would .signal be available on non-posix?
# self.assertEquals(f.value.signal, None)
try:
import process_tester, glob
for f in glob.glob(process_tester.test_file_match):
os.remove(f)
except:
pass
d.addCallback(check)
return d
def testManyProcesses(self):
def _check(results, protocols):
for p in protocols:
self.assertEquals(p.stages, [1, 2, 3, 4, 5], "[%d] stages = %s" % (id(p.transport), str(p.stages)))
# test status code
f = p.reason
f.trap(error.ProcessTerminated)
self.assertEquals(f.value.exitCode, 23)
exe = sys.executable
scriptPath = util.sibpath(__file__, "process_tester.py")
args = [exe, "-u", scriptPath]
protocols = []
deferreds = []
for i in xrange(50):
p = TestManyProcessProtocol()
protocols.append(p)
reactor.spawnProcess(p, exe, args, env=None)
deferreds.append(p.deferred)
deferredList = defer.DeferredList(deferreds, consumeErrors=True)
deferredList.addCallback(_check, protocols)
return deferredList
def testAbnormalTermination(self):
if os.path.exists('/bin/false'): cmd = '/bin/false'
elif os.path.exists('/usr/bin/false'): cmd = '/usr/bin/false'
else: raise RuntimeError("false not found in /bin or /usr/bin")
d = defer.Deferred()
p = TrivialProcessProtocol(d)
reactor.spawnProcess(p, cmd, ['false'], env=None,
usePTY=self.usePTY)
def check(ignored):
p.reason.trap(error.ProcessTerminated)
self.assertEquals(p.reason.value.exitCode, 1)
self.assertEquals(p.reason.value.signal, None)
d.addCallback(check)
return d
def testStderr(self):
# we assume there is no file named ZZXXX..., both in . and in /tmp
if not os.path.exists('/bin/ls'):
raise RuntimeError("/bin/ls not found")
p = Accumulator()
d = p.endedDeferred = defer.Deferred()
reactor.spawnProcess(p, '/bin/ls',
["/bin/ls",
"ZZXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"],
env=None, path="/tmp",
usePTY=self.usePTY)
def processEnded(ign):
self.assertEquals(lsOut, p.errF.getvalue())
return d.addCallback(processEnded)
def testProcess(self):
if os.path.exists('/bin/gzip'): cmd = '/bin/gzip'
elif os.path.exists('/usr/bin/gzip'): cmd = '/usr/bin/gzip'
else: raise RuntimeError("gzip not found in /bin or /usr/bin")
s = "there's no place like home!\n" * 3
p = Accumulator()
d = p.endedDeferred = defer.Deferred()
reactor.spawnProcess(p, cmd, [cmd, "-c"], env=None, path="/tmp",
usePTY=self.usePTY)
p.transport.write(s)
p.transport.closeStdin()
def processEnded(ign):
f = p.outF
f.seek(0, 0)
gf = gzip.GzipFile(fileobj=f)
self.assertEquals(gf.read(), s)
return d.addCallback(processEnded)
def doit(self, fd):
p = ClosingPipesProcessProtocol(True)
p.deferred.addCallbacks(
callback=lambda _: self.fail("I wanted an errback."),
errback=self._endProcess, errbackArgs=(p,))
reactor.spawnProcess(p, sys.executable,
[sys.executable, '-u', '-c',
r'raw_input(); import sys, os; os.write(%d, "foo\n"); sys.exit(42)' % fd],
env=None)
p.transport.write('go\n')
if fd == 1:
p.transport.closeStdout()
elif fd == 2:
p.transport.closeStderr()
else:
raise RuntimeError
# make the buggy case not hang
p.transport.closeStdin()
return p.deferred
def _win32_popen(self, args, env, callback_trigger=None):
# This is a workaround to prevent Command Prompt windows from opening
# when spawning tahoe processes from the GUI on Windows, as Twisted's
# reactor.spawnProcess() API does not allow Windows creation flags to
# be passed to subprocesses. By passing 0x08000000 (CREATE_NO_WINDOW),
# the opening of the Command Prompt window will be surpressed while
# still allowing access to stdout/stderr. See:
# https://twistedmatrix.com/pipermail/twisted-python/2007-February/014733.html
import subprocess
proc = subprocess.Popen(
args, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
universal_newlines=True, creationflags=0x08000000)
output = BytesIO()
for line in iter(proc.stdout.readline, ''):
output.write(line.encode('utf-8'))
self.line_received(line.rstrip())
if callback_trigger and callback_trigger in line.rstrip():
return proc.pid
proc.poll()
if proc.returncode:
raise subprocess.CalledProcessError(proc.returncode, args)
else:
return output.getvalue()
def command(self, args, callback_trigger=None):
exe = (self.executable if self.executable else which('tahoe')[0])
args = [exe] + ['-d', self.nodedir] + args
env = os.environ
env['PYTHONUNBUFFERED'] = '1'
if sys.platform == 'win32' and getattr(sys, 'frozen', False):
from twisted.internet.threads import deferToThread
output = yield deferToThread(
self._win32_popen, args, env, callback_trigger)
else:
protocol = CommandProtocol(self, callback_trigger)
reactor.spawnProcess(protocol, exe, args=args, env=env)
output = yield protocol.done
returnValue(output)
#@inlineCallbacks
#def start_monitor(self):
# furl = os.path.join(self.nodedir, 'private', 'logport.furl')
# yield self.command(['debug', 'flogtool', 'tail', furl])
def spawn_process(executable, args, env, outfile=None):
"""Spawn a process using Twisted reactor.
Return a deferred which will be called with process stdout, stderr
and exit code. If outfile is provided then the stdout and stderr
stings will be empty.
Note: this is a variant of landscape.lib.twisted_util.spawn_process
that supports streaming to a file.
"""
list_args = [executable]
list_args.extend(args)
logging.info("running {!r}".format(" ".join(list_args)))
import pprint
logging.info("OS env:\n" + pprint.pformat(env))
result = Deferred()
if outfile is None:
protocol = AllOutputProcessProtocol(result)
else:
protocol = StreamToFileProcessProtocol(result, outfile)
reactor.spawnProcess(protocol, executable, args=list_args, env=env)
return result
def spider_start(self, kw):
'''
'''
pp = SubProcessProtocol(self, self.factory.spiders, kw)
args = [sys.executable] + kw['cmd'].split(' ')
reactor.spawnProcess(pp,
sys.executable, args=args,
env={
'PYTHONPATH':'/srv/ataobao:$PYTHONPATH',
'ENV':'TEST',
},
path='/srv/ataobao/crawler')
# for i in range(0, kw["process"]):
# pp = SubProcessProtocol(self, self.factory.spiders, kw)
# args = [sys.executable, "spider_tx.py", kw["spider"], str(kw["threads"])]
# args = [sys.executable, "crawler/worker.py", "-w", "shop", "-p", "10"]
# args = [sys.executable, "crawler/tbcat.py", "-c", "16"]
# print args
# reactor.spawnProcess(pp, sys.executable, args=args)
def test_stdin(self):
"""
Making sure getPassword accepts a password from standard input by
running a child process which uses getPassword to read in a string
which it then writes it out again. Write a string to the child
process and then read one and make sure it is the right string.
"""
p = PasswordTestingProcessProtocol()
p.finished = Deferred()
reactor.spawnProcess(
p, pyExe,
[pyExe,
b'-c',
(b'import sys\n'
b'from twisted.python.util import getPassword\n'
b'sys.stdout.write(getPassword())\n'
b'sys.stdout.flush()\n')],
env={b'PYTHONPATH': os.pathsep.join(sys.path).encode("utf8")})
def processFinished(result):
(reason, output) = result
reason.trap(ProcessDone)
self.assertIn((1, b'secret'), output)
return p.finished.addCallback(processFinished)
def _spawn(script, outputFD):
"""
Start a script that is a peer of this test as a subprocess.
@param script: the module name of the script in this directory (no
package prefix, no '.py')
@type script: C{str}
@rtype: L{StartStopProcessProtocol}
"""
pyExe = FilePath(sys.executable).asBytesMode().path
env = bytesEnviron()
env[b"PYTHONPATH"] = FilePath(
pathsep.join(sys.path)).asBytesMode().path
sspp = StartStopProcessProtocol()
reactor.spawnProcess(
sspp, pyExe, [
pyExe,
FilePath(__file__).sibling(script + ".py").asBytesMode().path,
intToBytes(outputFD),
],
env=env,
childFDs={0: "w", 1: "r", 2: "r", outputFD: outputFD}
)
return sspp
def test_useReactorArgument(self):
"""
L{twcgi.FilteredScript.runProcess} uses the reactor passed as an
argument to the constructor.
"""
class FakeReactor:
"""
A fake reactor recording whether spawnProcess is called.
"""
called = False
def spawnProcess(self, *args, **kwargs):
"""
Set the C{called} flag to C{True} if C{spawnProcess} is called.
@param args: Positional arguments.
@param kwargs: Keyword arguments.
"""
self.called = True
fakeReactor = FakeReactor()
request = DummyRequest(['a', 'b'])
resource = twcgi.FilteredScript("dummy-file", reactor=fakeReactor)
_render(resource, request)
self.assertTrue(fakeReactor.called)
def test_twisted(self):
"""Invoking python -m twisted should execute twist."""
cmd = sys.executable
p = Accumulator()
d = p.endedDeferred = defer.Deferred()
reactor.spawnProcess(p, cmd, [cmd, '-m', 'twisted', '--help'], env=None)
p.transport.closeStdin()
def processEnded(ign):
f = p.outF
output = f.getvalue().replace(b'\r\n', b'\n')
options = TwistOptions()
message = '{}\n'.format(options).encode('utf-8')
self.assertEqual(output, message)
return d.addCallback(processEnded)
def test_stdio(self):
"""
L{twisted.internet.stdio} test.
"""
scriptPath = b"twisted.test.process_twisted"
p = Accumulator()
d = p.endedDeferred = defer.Deferred()
reactor.spawnProcess(p, pyExe, [pyExe, b'-u', b"-m", scriptPath],
env=properEnv,
path=None, usePTY=self.usePTY)
p.transport.write(b"hello, world")
p.transport.write(b"abc")
p.transport.write(b"123")
p.transport.closeStdin()
def processEnded(ign):
self.assertEqual(p.outF.getvalue(), b"hello, worldabc123",
"Output follows:\n"
"%s\n"
"Error message from process_twisted follows:\n"
"%s\n" % (p.outF.getvalue(), p.errF.getvalue()))
return d.addCallback(processEnded)
def test_unsetPid(self):
"""
Test if pid is None/non-None before/after process termination. This
reuses process_echoer.py to get a process that blocks on stdin.
"""
finished = defer.Deferred()
p = TrivialProcessProtocol(finished)
scriptPath = b"twisted.test.process_echoer"
procTrans = reactor.spawnProcess(p, pyExe,
[pyExe, b'-u', b"-m", scriptPath],
env=properEnv)
self.assertTrue(procTrans.pid)
def afterProcessEnd(ignored):
self.assertIsNone(procTrans.pid)
p.transport.closeStdin()
return finished.addCallback(afterProcessEnd)