def check(self):
"""
Check if the process has exited, and if an error occured.
Returns None if process hasn't exited, or a CompletedProcess
object if it has exited without an error. If the process
has exited with an error, raises a CalledProcessError.
If process has exited, all available stdout and stderr
are captured into the returned object or raised exception.
"""
retcode = self.poll()
if retcode is not None:
stdout, stderr = self.communicate(timeout=0)
completed = subprocess.CompletedProcess(
args=self.args,
returncode=retcode,
stdout=stdout,
stderr=stderr,
)
completed.check_returncode()
return completed
python类CompletedProcess()的实例源码
def _run(*args, env=None, check=False, timeout=None):
with subprocess.Popen([a.encode('utf-8') for a in args], env=env, stdout=PIPE, stderr=PIPE) as process:
try:
stdout, stderr = process.communicate(input, timeout=timeout)
except TimeoutExpired:
process.kill()
stdout, stderr = process.communicate()
raise TimeoutExpired(
process.args, timeout, output=stdout, stderr=stderr,
)
except:
process.kill()
process.wait()
raise
retcode = process.poll()
if check and retcode:
raise subprocess.CalledProcessError(
retcode, process.args, output=stdout, stderr=stderr,
)
return subprocess.CompletedProcess(process.args, retcode, stdout, stderr)
def test_verify_container_mountpoints_connection_failure(config_files, monkeypatch):
with open(config_files, "r") as main_file:
def fake_lxc_exec_command(*popenargs, **kwargs):
if get_command(popenargs) == 'lxc' and get_sub_command(popenargs) == 'exec':
if get_command_parameter(popenargs, '--') == 'true':
cmd = ['bash', '-c', '>&2 echo -e "lxc command failed" ; exit 1']
return subprocess.run(cmd, **kwargs)
else:
return subprocess.CompletedProcess("fakerun", 0, '')
else:
return subprocess.run(*popenargs, **kwargs)
monkeypatch.setattr(mockablerun, 'run_mockable', fake_lxc_exec_command)
parser = ConfigurationParser(main_file)
coordinator = SharedFolderCoordinator(parser)
with pytest.raises(FatalError) as error:
coordinator.verify_container_mountpoints('fake-container')
assert 'fake-container' in error.value.message
assert 'lxc command failed' in error.value.message
def test_verify_container_mountpoints_failure(config_files, monkeypatch):
with open(config_files, "r") as main_file:
def fake_lxc_exec_command(*popenargs, **kwargs):
if get_command(popenargs) == 'lxc' and get_sub_command(popenargs) == 'exec':
if get_command_parameter(popenargs, '--') == 'test':
return subprocess.CompletedProcess("failure", 1, 'failure')
else:
return subprocess.CompletedProcess("fakerun", 0, '')
else:
return subprocess.run(*popenargs, **kwargs)
monkeypatch.setattr(mockablerun, 'run_mockable', fake_lxc_exec_command)
parser = ConfigurationParser(main_file)
coordinator = SharedFolderCoordinator(parser)
with pytest.raises(FatalError) as error:
coordinator.verify_container_mountpoints('fake-container')
assert 'fake-container' in error.value.message
assert '/foo/bar/target_mountpoint' in error.value.message
def test_dictionary(monkeypatch, config_files, capsys, command, command_args):
def fake_lxc_config_command(*popenargs, **kwargs):
if 'images.compression_algorithm' in popenargs[0]:
return subprocess.CompletedProcess("fakerun", 0, '')
else:
return subprocess.run(*popenargs, **kwargs)
monkeypatch.setattr(mockablerun, 'run_mockable', fake_lxc_config_command)
parser = edi._setup_command_line_interface()
command_args.append(config_files)
cli_args = parser.parse_args(command_args)
command().run_cli(cli_args)
out, err = capsys.readouterr()
assert err == ''
dictionary = yaml.load(out)
assert dictionary.get('edi_config_directory') == os.path.dirname(config_files)
assert dictionary.get('edi_project_plugin_directory') == os.path.join(os.path.dirname(config_files), 'plugins')
def test_config(monkeypatch, config_files, capsys, command, command_args):
def fake_lxc_config_command(*popenargs, **kwargs):
if 'images.compression_algorithm' in popenargs[0]:
return subprocess.CompletedProcess("fakerun", 0, '')
else:
return subprocess.run(*popenargs, **kwargs)
monkeypatch.setattr(mockablerun, 'run_mockable', fake_lxc_config_command)
parser = edi._setup_command_line_interface()
command_args.append(config_files)
cli_args = parser.parse_args(command_args)
command().run_cli(cli_args)
out, err = capsys.readouterr()
assert err == ''
merged_config = yaml.load(out)
assert merged_config.get('bootstrap').get('architecture') == 'i386'
def test_runner_git_version(self):
"""ProcessRunner should return subprocess.CompletedProcess"""
runner = ProcessRunner(['git'], workdir=str(self.tempdir))
proc = runner('version')
self.assertIsInstance(proc, subprocess.CompletedProcess)
self.assertIn('git version', proc.stdout)
def test_git_commit(tmpworkdir):
mktree(tmpworkdir, {
'test.js': 'X',
'harrier.yml': """\
root: .
assets:
active: True"""
})
with patch('harrier.tools.subprocess.run') as mock_run:
mock_run.return_value = CompletedProcess(args=[], returncode=0, stdout=b'commit sha1\n')
config = Config('harrier.yml')
config.setup()
build(config)
assert gettree(tmpworkdir.join('build')) == {
'test.js': 'X',
'assets.json': """\
{
"commit": "commit sha1",
"files": {
"test.js": "/test.js"
}
}
""",
}
assert mock_run.called
def run(self, script, target=None) -> subprocess.CompletedProcess:
path, script = self.write_script(script)
cmd = [self.make_exe, '-f', str(path)]
if target:
cmd.append(target)
res = subprocess.run(cmd, cwd=str(self.tmpdir), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
exit_nok = res.returncode not in self.valid_exit_codes
output_nok = b'Syntax error' in res.stderr or b'ERROR' in res.stderr
if exit_nok or output_nok:
pytest.fail(self.format_error(script, res))
return res
def update_completed(self):
"""
return an instance of subprocess.CompletedProcess
"""
command = [ "nbh", "course-update-from-git", self.coursename ]
completed = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return completed
def mocked_stdout(stdout):
return subprocess.CompletedProcess(['blah', 'args'], 0, stdout, None)
def run(
self,
args: List[str],
user: str,
log_output_live: bool = False,
env: Optional[Dict] = None,
) -> CompletedProcess:
"""
Run a command on this node the given user.
Args:
args: The command to run on the node.
user: The username to SSH as.
log_output_live: If `True`, log output live. If `True`, stderr is
merged into stdout in the return value.
env: Environment variables to be set on the node before running
the command. A mapping of environment variable names to
values.
Returns:
The representation of the finished process.
Raises:
CalledProcessError: The process exited with a non-zero code.
"""
ssh_args = self._compose_ssh_command(args=args, user=user, env=env)
return run_subprocess(args=ssh_args, log_output_live=log_output_live)
def test_lxd_connection(config_files, monkeypatch):
def fake_ansible_playbook_run(*popenargs, **kwargs):
if get_command(popenargs) == 'ansible-playbook':
assert 'lxd' == get_command_parameter(popenargs, '--connection')
verify_inventory(get_command_parameter(popenargs, '--inventory'))
verify_extra_vars(get_command_parameter(popenargs, '--extra-vars').lstrip('@'))
# TODO: verify --user for ssh connection
return subprocess.CompletedProcess("fakerun", 0, '')
else:
return subprocess.run(*popenargs, **kwargs)
monkeypatch.setattr(mockablerun, 'run_mockable', fake_ansible_playbook_run)
def fakechown(*_):
pass
monkeypatch.setattr(shutil, 'chown', fakechown)
with open(config_files, "r") as main_file:
parser = ConfigurationParser(main_file)
runner = PlaybookRunner(parser, "fake-container", "lxd")
playbooks = runner.run_all()
expected_playbooks = ['10_base_system', '20_networking', '30_foo']
assert playbooks == expected_playbooks
def test_verify_container_mountpoints(config_files, monkeypatch):
with open(config_files, "r") as main_file:
def fake_lxc_exec_command(*popenargs, **kwargs):
if get_command(popenargs) == 'lxc' and get_sub_command(popenargs) == 'exec':
return subprocess.CompletedProcess("fakerun", 0, '')
else:
return subprocess.run(*popenargs, **kwargs)
monkeypatch.setattr(mockablerun, 'run_mockable', fake_lxc_exec_command)
parser = ConfigurationParser(main_file)
coordinator = SharedFolderCoordinator(parser)
coordinator.verify_container_mountpoints('fake-container')
def test_create_host_folders_successful_create(config_files, monkeypatch):
with open(config_files, "r") as main_file:
parser = ConfigurationParser(main_file)
coordinator = SharedFolderCoordinator(parser)
def fake_os_path_isdir(*_):
return False
monkeypatch.setattr(os.path, 'isdir', fake_os_path_isdir)
def fake_os_path_exists(*_):
return False
monkeypatch.setattr(os.path, 'exists', fake_os_path_exists)
def fake_mkdir_command(*popenargs, **kwargs):
if get_command(popenargs) == 'mkdir' and get_sub_command(popenargs) == '-p':
folder = popenargs[0][-1]
assert 'valid_folder' in folder or 'work' in folder
return subprocess.CompletedProcess("fakerun", 0, '')
else:
return subprocess.run(*popenargs, **kwargs)
monkeypatch.setattr(mockablerun, 'run_mockable', fake_mkdir_command)
coordinator.create_host_folders() # successful mkdir
def test_target_configure(config_files, monkeypatch, capsys):
def fakerun(*popenargs, **kwargs):
if get_command(popenargs) == "ansible-playbook":
return subprocess.CompletedProcess("fakerun", 0, '')
else:
print('Passthrough: {}'.format(get_command(popenargs)))
return subprocess.run(*popenargs, **kwargs)
monkeypatch.setattr(mockablerun, 'run_mockable', fakerun)
suppress_chown_during_debuild(monkeypatch)
with workspace():
edi_exec = os.path.join(get_project_root(), 'bin', 'edi')
project_name = 'pytest-{}'.format(get_random_string(6))
config_command = [edi_exec, 'config', 'init', project_name, 'debian-jessie-amd64']
run(config_command) # run as non root
parser = edi._setup_command_line_interface()
cli_args = parser.parse_args(['target', 'configure', 'remote-target', '{}-develop.yml'.format(project_name)])
Configure().run_cli(cli_args)
out, err = capsys.readouterr()
print(out)
assert not err
def popen(self, std_in=None, environ=None):
self.__verify_command()
if self.dry_run:
self.__stdout = self._DRY_RUN_OUTPUT
self.__stderr = self._DRY_RUN_OUTPUT
self.__returncode = 0
self.__debug_logging_method("dry-run: {}".format(self.command))
return subprocess.CompletedProcess(
args=[], returncode=self.__returncode,
stdout=self.__stdout, stderr=self.__stderr)
self.__debug_print_command()
try:
process = subprocess.Popen(
self.command, env=self.__get_env(environ), shell=True,
stdin=std_in, stdout=PIPE, stderr=PIPE)
except TypeError:
process = subprocess.Popen(
self.command, shell=True,
stdin=std_in, stdout=PIPE, stderr=PIPE)
return process
def ex_subprocess(cmd):
"""subprocess wrapper function
Arguments:
- cmd (list(str)): the command to feed the subprocess
Return:
(out, compl_proc, error_proc) (3-uple):
- out (int) : 1 suprocess fail, 0 subprocess succeed
- compl_proc (subprocess.CompletedProcess instance): return value
of the subprocess when it has succeeded
- error_proc (subprocess.CalledProcessError instance): value of the
subprocess when an Exception has occured
"""
out = 1
compl_proc = None
error_proc = None
try:
compl_proc = subprocess.run(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, check=True)
except subprocess.CalledProcessError as error:
error_proc = error
else:
out = 0
finally:
return out, compl_proc, error_proc
def retrieve_stdout(self):
"""Get the stdout from a completed process instance
Arguments:
- comp_proc (subprocess.CompletedProcess instance) : instance
that is returned when subprocess.run() function is completed
"""
return self.compl_proc.stdout.decode('UTF-8').rstrip()
def runCmd(*args, captureOutput=False, captureError=False, input: "typing.Union[str, bytes]"=None, timeout=None,
printVerboseOnly=False, runInPretendMode=False, **kwargs):
if len(args) == 1 and isinstance(args[0], (list, tuple)):
cmdline = args[0] # list with parameters was passed
else:
cmdline = args
cmdline = list(map(str, cmdline)) # ensure it's all strings so that subprocess can handle it
# When running scripts from a noexec filesystem try to read the interpreter and run that
printCommand(cmdline, cwd=kwargs.get("cwd"), env=kwargs.get("env"), printVerboseOnly=printVerboseOnly)
if "cwd" in kwargs:
kwargs["cwd"] = str(kwargs["cwd"])
else:
# os.getcwd() raises an exception if the cwd was deleted
try:
kwargs["cwd"] = os.getcwd()
except FileNotFoundError:
kwargs["cwd"] = tempfile.gettempdir()
if not runInPretendMode and _cheriConfig.pretend:
return CompletedProcess(args=cmdline, returncode=0, stdout=b"", stderr=b"")
# actually run the process now:
if input is not None:
assert "stdin" not in kwargs # we need to use stdin here
kwargs['stdin'] = subprocess.PIPE
if not isinstance(input, bytes):
input = str(input).encode("utf-8")
if captureOutput:
assert "stdout" not in kwargs # we need to use stdout here
kwargs["stdout"] = subprocess.PIPE
if captureError:
assert "stderr" not in kwargs # we need to use stdout here
kwargs["stderr"] = subprocess.PIPE
elif _cheriConfig.quiet and "stdout" not in kwargs:
kwargs["stdout"] = subprocess.DEVNULL
if "env" in kwargs:
kwargs["env"] = dict((k, str(v)) for k, v in kwargs["env"].items())
with popen_handle_noexec(cmdline, **kwargs) as process:
try:
stdout, stderr = process.communicate(input, timeout=timeout)
except subprocess.TimeoutExpired:
process.kill()
stdout, stderr = process.communicate()
# TODO py35: pass stderr=stderr as well
raise subprocess.TimeoutExpired(process.args, timeout, output=stdout)
except Exception as e:
process.kill()
process.wait()
raise
retcode = process.poll()
if retcode:
if _cheriConfig and _cheriConfig.pretend:
cwd = (". Working directory was ", kwargs["cwd"]) if "cwd" in kwargs else ()
fatalError("Command ", "`" + " ".join(map(shlex.quote, process.args)) +
"` failed with non-zero exit code ", retcode, *cwd, sep="")
else:
raise _make_called_process_error(retcode, process.args, stdout=stdout, cwd=kwargs["cwd"])
return CompletedProcess(process.args, retcode, stdout, stderr)
def run_integration_tests(
self,
pytest_command: List[str],
env: Optional[Dict] = None,
log_output_live: bool = False,
) -> subprocess.CompletedProcess:
"""
Run integration tests on a random master node.
Args:
pytest_command: The ``pytest`` command to run on the node.
env: Environment variables to be set on the node before running
the `pytest_command`. On enterprise
clusters, `DCOS_LOGIN_UNAME` and `DCOS_LOGIN_PW` must be set.
log_output_live: If `True`, log output of the `pytest_command`
live. If `True`, stderr is merged into stdout in the return
value.
Returns:
The result of the ``pytest`` command.
Raises:
``subprocess.CalledProcessError`` if the ``pytest`` command fails.
"""
args = [
'source',
'/opt/mesosphere/environment.export',
'&&',
'cd',
'/opt/mesosphere/active/dcos-integration-test/',
'&&',
]
env = env or {}
def ip_addresses(nodes: Iterable[Node]) -> str:
return ','.join(map(lambda node: str(node.ip_address), nodes))
environment_variables = {
'MASTER_HOSTS': ip_addresses(self.masters),
'SLAVE_HOSTS': ip_addresses(self.agents),
'PUBLIC_SLAVE_HOSTS': ip_addresses(self.public_agents),
**env,
}
args += pytest_command
# Tests are run on a random master node.
test_host = next(iter(self.masters))
return test_host.run(
args=args,
user=self.default_ssh_user,
log_output_live=log_output_live,
env=environment_variables,
)
def test_bootstrap(config_files, monkeypatch):
with open(config_files, "r") as main_file:
def fakegetuid():
return 0
monkeypatch.setattr(os, 'getuid', fakegetuid)
def fakechown(*_):
pass
monkeypatch.setattr(shutil, 'chown', fakechown)
def fakerun(*popenargs, **kwargs):
if get_command(popenargs) == "chroot":
rootfs_path = get_command_parameter(popenargs, "chroot")
if not os.path.exists(rootfs_path):
os.mkdir(rootfs_path)
elif get_command(popenargs) == "debootstrap":
rootfs_path = popenargs[0][-2]
apt_dir = os.path.join(rootfs_path, 'etc', 'apt')
os.makedirs(apt_dir)
pass
elif get_command(popenargs) == "tar":
archive = get_command_parameter(popenargs, '-acf')
with open(archive, mode="w") as fakearchive:
fakearchive.write("fake archive")
elif popenargs[0][-2] == "dpkg" and popenargs[0][-1] == "--print-architecture":
return subprocess.CompletedProcess("fakerun", 0, 'amd64')
elif get_command(popenargs) == "lxd" and get_sub_command(popenargs) == "--version":
return subprocess.CompletedProcess("fakerun", 0, '2.18')
elif get_command(popenargs) == "printenv":
if get_sub_command(popenargs) == "HOME":
return subprocess.CompletedProcess("fakerun", 0, '/no/such/directory')
else:
return subprocess.CompletedProcess("fakerun", 0, '')
else:
print('Passthrough: {}'.format(get_command(popenargs)))
return subprocess.run(*popenargs, **kwargs)
return subprocess.CompletedProcess("fakerun", 0, '')
monkeypatch.setattr(mockablerun, 'run_mockable', fakerun)
monkeypatch.chdir(os.path.dirname(config_files))
bootstrap_cmd = Bootstrap()
with requests_mock.Mocker() as m:
m.get('https://ftp-master.debian.org/keys/archive-key-8.asc', text='key file mockup')
bootstrap_cmd.run(main_file)
expected_result = bootstrap_cmd._result()
assert os.path.exists(expected_result)
previous_result_text = "previous result"
with open(expected_result, mode="w") as previous_result:
previous_result.write(previous_result_text)
bootstrap_cmd2 = Bootstrap()
bootstrap_cmd2.run(main_file)
with open(expected_result, mode="r") as same_result:
assert same_result.read() == previous_result_text
def test_plugins(monkeypatch, config_files, capsys, command, command_args, has_templates,
has_profiles, has_playbooks, has_postprocessing_commands):
def fake_lxc_config_command(*popenargs, **kwargs):
if 'images.compression_algorithm' in popenargs[0]:
return subprocess.CompletedProcess("fakerun", 0, '')
else:
return subprocess.run(*popenargs, **kwargs)
monkeypatch.setattr(mockablerun, 'run_mockable', fake_lxc_config_command)
parser = edi._setup_command_line_interface()
command_args.append(config_files)
cli_args = parser.parse_args(command_args)
command().run_cli(cli_args)
out, err = capsys.readouterr()
assert err == ''
result = yaml.load(out)
if has_templates:
assert result.get('lxc_templates')
else:
assert not result.get('lxc_templates')
if has_profiles:
assert result.get('lxc_profiles')
else:
assert not result.get('lxc_profiles')
if has_playbooks:
assert len(result.get('playbooks')) == 3
base_system = result.get('playbooks')[0].get('10_base_system')
assert 'plugins/playbooks/foo.yml' in base_system.get('path')
assert base_system.get('dictionary').get('kernel_package') == 'linux-image-amd64-rt'
assert base_system.get('dictionary').get('edi_config_directory') == os.path.dirname(config_files)
else:
assert not result.get('playbooks')
if has_postprocessing_commands:
assert result.get('postprocessing_commands')
else:
assert not result.get('postprocessing_commands')