def runCmd(*args, captureOutput=False, captureError=False, input: "typing.Union[str, bytes]"=None, timeout=None,
printVerboseOnly=False, runInPretendMode=False, **kwargs):
if len(args) == 1 and isinstance(args[0], (list, tuple)):
cmdline = args[0] # list with parameters was passed
else:
cmdline = args
cmdline = list(map(str, cmdline)) # ensure it's all strings so that subprocess can handle it
# When running scripts from a noexec filesystem try to read the interpreter and run that
printCommand(cmdline, cwd=kwargs.get("cwd"), env=kwargs.get("env"), printVerboseOnly=printVerboseOnly)
if "cwd" in kwargs:
kwargs["cwd"] = str(kwargs["cwd"])
else:
# os.getcwd() raises an exception if the cwd was deleted
try:
kwargs["cwd"] = os.getcwd()
except FileNotFoundError:
kwargs["cwd"] = tempfile.gettempdir()
if not runInPretendMode and _cheriConfig.pretend:
return CompletedProcess(args=cmdline, returncode=0, stdout=b"", stderr=b"")
# actually run the process now:
if input is not None:
assert "stdin" not in kwargs # we need to use stdin here
kwargs['stdin'] = subprocess.PIPE
if not isinstance(input, bytes):
input = str(input).encode("utf-8")
if captureOutput:
assert "stdout" not in kwargs # we need to use stdout here
kwargs["stdout"] = subprocess.PIPE
if captureError:
assert "stderr" not in kwargs # we need to use stdout here
kwargs["stderr"] = subprocess.PIPE
elif _cheriConfig.quiet and "stdout" not in kwargs:
kwargs["stdout"] = subprocess.DEVNULL
if "env" in kwargs:
kwargs["env"] = dict((k, str(v)) for k, v in kwargs["env"].items())
with popen_handle_noexec(cmdline, **kwargs) as process:
try:
stdout, stderr = process.communicate(input, timeout=timeout)
except subprocess.TimeoutExpired:
process.kill()
stdout, stderr = process.communicate()
# TODO py35: pass stderr=stderr as well
raise subprocess.TimeoutExpired(process.args, timeout, output=stdout)
except Exception as e:
process.kill()
process.wait()
raise
retcode = process.poll()
if retcode:
if _cheriConfig and _cheriConfig.pretend:
cwd = (". Working directory was ", kwargs["cwd"]) if "cwd" in kwargs else ()
fatalError("Command ", "`" + " ".join(map(shlex.quote, process.args)) +
"` failed with non-zero exit code ", retcode, *cwd, sep="")
else:
raise _make_called_process_error(retcode, process.args, stdout=stdout, cwd=kwargs["cwd"])
return CompletedProcess(process.args, retcode, stdout, stderr)
评论列表
文章目录