def test_command_result(proc):
proc.options["-c"] = ""
result = proc.run()
time.sleep(2)
assert result.pid
assert result.returncode == os.EX_OK
assert result.stdout
assert result.stdin is None
assert not result.alive()
assert str(result)
assert repr(result)
python类EX_OK的实例源码
def test_command_result_running(proc):
proc.options["-c"] = "import time; time.sleep(2)"
result = proc.run()
assert result.pid
assert result.returncode is None
assert result.alive()
assert str(result)
assert repr(result)
time.sleep(3.5)
assert result.pid
assert result.returncode == os.EX_OK
assert not result.alive()
def execute(self, tsk, stop_ev):
# Small hack to prevent execution of callback BEFORE task
# happen to arrive into self.data. It is possible because
# submitting task into pool is eager.
while tsk.id not in self.data:
time.sleep(0.1)
plugin = self.get_plugin(tsk)
with plugin.execute(tsk) as process:
tsk = tsk.set_executor_data(platform.node(), process.pid)
LOG.info(
"Management process for task %s was started. Pid %d",
tsk, process.pid
)
while not stop_ev.is_set() and process.alive():
stop_ev.wait(0.5)
process.stop()
LOG.info(
"Management process for task %s with PID %d has "
"stopped with exit code %d",
tsk, process.pid, process.returncode
)
if process.returncode != os.EX_OK:
raise ChildProcessError(
"Process exit with code {0}".format(process.returncode))
def main():
if not PATH_ANSIBLE:
sys.exit("Cannot find ansible-playbook executable.")
logging.basicConfig(
format="%(asctime)s [%(levelname)-5s] %(message)s",
level=logging.DEBUG)
if not os.path.isfile(ANSIBLE_CONFIG_PATH):
LOG.warning("Cannot find Ansible config at %r", ANSIBLE_CONFIG_PATH)
for cluster in get_clusters():
if not cluster["configuration"]:
LOG.info("Skip cluster %s because it has no servers",
cluster["name"])
continue
server = random.choice(cluster["configuration"])
server = server["server_id"]
server = get_server_by_id(server)
commandline = get_ansible_commandline(
server["username"], server["ip"], cluster["name"])
LOG.info("Collect from %s", cluster["name"])
LOG.debug("Execute %s", commandline)
code = execute(commandline)
if code == os.EX_OK:
LOG.info("Collected from %s", cluster["name"])
else:
LOG.warning("Cannot collect from %s: %d", cluster["name"], code)
def get_ceph_version(prefix, connection, cluster_name):
command = "sudo -EHn -- ceph --cluster {0} version".format(
shlex.quote(cluster_name))
result = await connection.run(command)
if result.exit_status != os.EX_OK:
click.echo(
"{0}ceph-version (failed {1}): {2}".format(
prefix, result.exit_status, result.stderr.strip()))
else:
click.echo(
"{0}ceph-version (ok): {1}".format(
prefix, result.stdout.strip()))
def test_dry_run_commands(self):
unauthorized_ok = [dict(return_codes=[os.EX_OK]),
dict(return_codes=[1, os.EX_SOFTWARE], stderr="UnauthorizedOperation")]
self.call("aegea launch unittest --dry-run --no-verify-ssh-key-pem-file",
shell=True, expect=unauthorized_ok)
self.call("aegea launch unittest --dry-run --spot --no-verify-ssh-key-pem-file",
shell=True, expect=unauthorized_ok)
self.call("aegea launch unittest --dry-run --duration-hours 1 --no-verify-ssh-key-pem-file",
shell=True, expect=unauthorized_ok)
self.call("aegea launch unittest --duration 0.5 --min-mem 6 --cores 2 --dry-run --no-verify --client-token t",
shell=True, expect=unauthorized_ok)
self.call("aegea build_ami i --dry-run --no-verify-ssh-key-pem-file",
shell=True, expect=unauthorized_ok)
def test_secrets(self):
unauthorized_ok = [dict(return_codes=[os.EX_OK]),
dict(return_codes=[1, os.EX_SOFTWARE], stderr="(AccessDenied|NoSuchKey)")]
self.call("test_secret=test aegea secrets put test_secret --iam-role aegea.launch",
shell=True, expect=unauthorized_ok)
self.call("aegea secrets put test_secret --generate-ssh-key --iam-role aegea.launch",
shell=True, expect=unauthorized_ok)
self.call("aegea secrets ls", shell=True, expect=unauthorized_ok)
self.call("aegea secrets ls --json", shell=True, expect=unauthorized_ok)
self.call("aegea secrets get test_secret --iam-role aegea.launch", shell=True, expect=unauthorized_ok)
self.call("aegea secrets delete test_secret --iam-role aegea.launch", shell=True, expect=unauthorized_ok)
def check_output(self, command, input_data=None, stderr=sys.stderr):
logger.debug('Running "%s"', command)
ssh_stdin, ssh_stdout, ssh_stderr = self.exec_command(command)
if input_data is not None:
ssh_stdin.write(input_data)
exit_code = ssh_stdout.channel.recv_exit_status()
stderr.write(ssh_stderr.read().decode("utf-8"))
if exit_code != os.EX_OK:
raise Exception('Error while running "{}": {}'.format(command, os.errno.errorcode.get(exit_code)))
return ssh_stdout.read().decode("utf-8")
def main(argv):
parser = get_parser()
# The parser arguments (cfg.args) are accessible everywhere after this call.
cfg.args = parser.parse_args()
# This initiates the global yml configuration instance so it will be
# accessible everywhere after this call.
cfg.initiate_config()
if not cfg.args.file and not cfg.args.q:
eprint("No file provided and not in query mode\n")
parser.print_help()
sys.exit(os.EX_USAGE)
jira, username = jiralogin.get_jira_instance(cfg.args.t)
if cfg.args.x or cfg.args.e:
if not cfg.args.q:
eprint("Arguments '-x' and '-e' can only be used together with '-q'")
sys.exit(os.EX_USAGE)
if cfg.args.p and not cfg.args.q:
eprint("Arguments '-p' can only be used together with '-q'")
sys.exit(os.EX_USAGE)
if cfg.args.q:
filename = get_jira_issues(jira, username)
if cfg.args.p:
print_status_file(filename)
sys.exit(os.EX_OK)
elif cfg.args.file is not None:
filename = cfg.args.file
else:
eprint("Trying to run script with unsupported configuration. Try using --help.")
sys.exit(os.EX_USAGE)
if get_editor():
open_editor(filename)
parse_status_file(jira, filename)
def ping_vm0(ovirt_prefix):
nt.assert_equals(_ping(ovirt_prefix, VM0_PING_DEST), EX_OK)
def restore_vm0_networking(ovirt_prefix):
# Networking may not work after resume. We need this pseudo-test for the
# purpose of reviving VM networking by rebooting the VM. We must be
# careful to reboot just the guest OS, not to restart the whole VM, to keep
# checking for contingent failures after resume.
# A better solution might be using a guest OS other than Cirros.
if _ping(ovirt_prefix, VM0_PING_DEST) == EX_OK:
return
host = _vm_host(ovirt_prefix, VM0_NAME)
uri = 'qemu+tls://%s/system' % host.name()
ret = host.ssh(['virsh', '-c', uri, 'reboot', '--mode', 'acpi', VM0_NAME])
nt.assert_equals(ret.code, EX_OK)
# We might want to wait until ssh server inside the VM gets up. But the
# interim tests, especially *_recovery, and repeated ssh connection
# attempts in host.ssh calls should give enough time.
def ping_vm0(ovirt_prefix):
nt.assert_equals(_ping(ovirt_prefix, VM0_PING_DEST), EX_OK)
def restore_vm0_networking(ovirt_prefix):
# Networking may not work after resume. We need this pseudo-test for the
# purpose of reviving VM networking by rebooting the VM. We must be
# careful to reboot just the guest OS, not to restart the whole VM, to keep
# checking for contingent failures after resume.
# A better solution might be using a guest OS other than Cirros.
if _ping(ovirt_prefix, VM0_PING_DEST) == EX_OK:
return
host = _vm_host(ovirt_prefix, VM0_NAME)
uri = 'qemu+tls://%s/system' % host.name()
ret = host.ssh(['virsh', '-c', uri, 'reboot', '--mode', 'acpi', VM0_NAME])
nt.assert_equals(ret.code, EX_OK)
# We might want to wait until ssh server inside the VM gets up. But the
# interim tests, especially *_recovery, and repeated ssh connection
# attempts in host.ssh calls should give enough time.
def script(config, docs, **kwargs):
ready, error = prepare_docs_script_mode(config, docs)
if not ready:
return error
file = kwargs.get('file', sys.stdout)
print(preamble, file=file)
buildsuccess, results = docbuild(config, docs, **kwargs)
print(postamble, file=file)
for errcode, source in results:
if not errcode:
logger.error("Could not generate script for %s", source.stem)
if buildsuccess:
return os.EX_OK
else:
return "Script generation failed."
def test_run_detail(self):
self.add_published('Published-HOWTO', example.ex_linuxdoc)
self.add_new('New-HOWTO', example.ex_linuxdoc)
self.add_stale('Stale-HOWTO', example.ex_linuxdoc)
self.add_orphan('Orphan-HOWTO', example.ex_linuxdoc)
self.add_broken('Broken-HOWTO', example.ex_linuxdoc)
argv = self.argv
argv.append('--detail')
exitcode = tldp.driver.run(argv)
self.assertEqual(exitcode, os.EX_OK)
def test_run_doctypes(self):
exitcode = tldp.driver.run(['--doctypes'])
self.assertEqual(exitcode, os.EX_OK)
def test_show_statustypes(self):
stdout = io.StringIO()
result = tldp.driver.show_statustypes(Namespace(), file=stdout)
self.assertEqual(result, os.EX_OK)
stdout.seek(0)
data = stdout.read()
for status in status_types:
self.assertTrue(stypes[status] in data)
def test_run_statustypes(self):
exitcode = tldp.driver.run(['--statustypes'])
self.assertEqual(exitcode, os.EX_OK)
def test_run_summary(self):
self.add_published('Published-HOWTO', example.ex_linuxdoc)
self.add_new('New-HOWTO', example.ex_linuxdoc)
self.add_stale('Stale-HOWTO', example.ex_linuxdoc)
self.add_orphan('Orphan-HOWTO', example.ex_linuxdoc)
self.add_broken('Broken-HOWTO', example.ex_linuxdoc)
argv = self.argv
argv.append('--summary')
exitcode = tldp.driver.run(argv)
self.assertEqual(exitcode, os.EX_OK)