def fix_subprocess(override_debug=False, override_exception=False):
"""Activate the subprocess compatibility."""
import subprocess
# Exceptions
if subprocess.__dict__.get("SubprocessError") is None:
subprocess.SubprocessError = _Internal.SubprocessError
if _InternalReferences.UsedCalledProcessError is None:
if "CalledProcessError" in subprocess.__dict__:
_subprocess_called_process_error(True, subprocess)
else:
_subprocess_called_process_error(False, subprocess)
subprocess.CalledProcessError = _InternalReferences.UsedCalledProcessError
def _check_output(*args, **kwargs):
if "stdout" in kwargs:
raise ValueError("stdout argument not allowed, "
"it will be overridden.")
process = subprocess.Popen(stdout=subprocess.PIPE, *args, **kwargs)
stdout_data, __ = process.communicate()
ret_code = process.poll()
if ret_code is None:
raise RuntimeWarning("The process is not yet terminated.")
if ret_code:
cmd = kwargs.get("args")
if cmd is None:
cmd = args[0]
raise _InternalReferences.UsedCalledProcessError(returncode=ret_code, cmd=cmd, output=stdout_data)
return stdout_data
try:
subprocess.check_output
except AttributeError:
subprocess.check_output = _check_output
评论列表
文章目录