def manage_events(self, notify):
filename = os.path.basename(self.source_path)
while True:
try:
events = notify.read()
except KeyboardInterrupt:
return os.EX_OK
else:
LOG.debug("Caught %d events", len(events))
events = self.filter_events(filename, events)
descriptions = self.describe_events(events)
LOG.debug("Got %d events after filtration: %s",
len(descriptions), descriptions)
if events:
self.output()
LOG.info("Config was managed. Going to the next loop.")
python类EX_OK的实例源码
def __str__(self):
try:
assert self.project_name
assert self.case_name
result = 'PASS' if(self.is_successful(
) == TestCase.EX_OK) else 'FAIL'
msg = prettytable.PrettyTable(
header_style='upper', padding_width=5,
field_names=['test case', 'project', 'duration',
'result'])
msg.add_row([self.case_name, self.project_name,
self.get_duration(), result])
return msg.get_string()
except AssertionError:
self.__logger.error("We cannot print invalid objects")
return super(TestCase, self).__str__()
def run_tier(self, tier):
"""Run one tier"""
tier_name = tier.get_name()
tests = tier.get_tests()
if not tests:
LOGGER.info("There are no supported test cases in this tier "
"for the given scenario")
self.overall_result = Result.EX_ERROR
else:
LOGGER.info("Running tier '%s'", tier_name)
for test in tests:
self.run_test(test)
test_case = self.executed_test_cases[test.get_name()]
if test_case.is_successful() != testcase.TestCase.EX_OK:
LOGGER.error("The test case '%s' failed.", test.get_name())
if test.get_project() == "functest":
self.overall_result = Result.EX_ERROR
if test.is_blocking():
raise BlockingTestFailed(
"The test case {} failed and is blocking".format(
test.get_name()))
return self.overall_result
def test_option_repeat_interval(self):
"""test --retry and --interval options"""
# run with --retry, see 2 lines, then kill -INT
cmd, output = runCmdOutput(['-p', '7788', '-r'],
wait=False, limit=2)
cmd.send_signal(signal.SIGINT)
self.assertEqual(cmd.wait(), 1)
cmd.stdout.close()
# run with --retry, see 4 lines, then kill -INT
cmd, output = runCmdOutput(['-p', '7788', '-r', '-i', '1'],
wait=False, limit=4)
cmd.send_signal(signal.SIGINT)
self.assertEqual(cmd.wait(), 1)
cmd.stdout.close()
# invalid --interval option argument (int > 0)
cmd, output = runCmdOutput(['-p', '7788', '-i', '0'])
self.assertEqual(cmd.returncode, os.EX_USAGE)
# --interval option argument ignored if no --retry
cmd, output = runCmdOutput(['-p', '7788', '-i', '1000'])
self.assertEqual(cmd.returncode, os.EX_OK)
def _instancecheck_impl(self, value, info: Info) -> InfoMsg:
if not isinstance(value, str) or value == "":
return info.errormsg(self)
value = os.path.expanduser(value)
if self.allow_std and value == "-" and (self.constraint is None or self.constraint(value)):
return info.wrap(True)
is_valid = True
if os.path.exists(value):
if os.path.isfile(value) and os.access(os.path.abspath(value), os.W_OK)\
and (self.constraint is None or self.constraint(value)):
return info.wrap(True)
return info.errormsg(self)
if not self.allow_non_existent:
return info.errormsg(self, "File doesn't exist")
abs_name = os.path.abspath(value)
dir_name = os.path.dirname(abs_name)
if os.path.exists(dir_name) and os.access(dir_name, os.EX_OK) and os.access(dir_name, os.W_OK) \
and (self.constraint is None or self.constraint(value)):
return info.wrap(True)
return info.errormsg(self)
def test_main_list(monkeypatch, capsys, mocked_sysexit, mocked_configure):
server_id = pytest.faux.gen_uuid()
host = pytest.faux.gen_alphanumeric()
username = pytest.faux.gen_alphanumeric()
initiator_id = pytest.faux.gen_uuid()
tsk = task.ServerDiscoveryTask(server_id, host, username, initiator_id)
tsk = tsk.create()
monkeypatch.setenv(process.ENV_ENTRY_POINT, "server_discovery")
monkeypatch.setenv(process.ENV_TASK_ID, str(tsk._id))
monkeypatch.setattr("sys.argv", ["progname", "--list"])
assert inventory.main() == os.EX_OK
mocked_sysexit.assert_not_called()
out, _ = capsys.readouterr()
arg = json.loads(out)
assert arg["new"]["hosts"] == [host]
assert arg["_meta"]["hostvars"][host]["ansible_user"] == username
def test_main_host_ok(monkeypatch, capsys, mocked_sysexit, mocked_configure):
server_id = pytest.faux.gen_uuid()
host = pytest.faux.gen_alphanumeric()
username = pytest.faux.gen_alphanumeric()
initiator_id = pytest.faux.gen_uuid()
tsk = task.ServerDiscoveryTask(server_id, host, username, initiator_id)
tsk = tsk.create()
monkeypatch.setenv(process.ENV_ENTRY_POINT, "server_discovery")
monkeypatch.setenv(process.ENV_TASK_ID, str(tsk._id))
monkeypatch.setattr("sys.argv", ["progname", "--host", host])
assert inventory.main() == os.EX_OK
mocked_sysexit.assert_not_called()
out, _ = capsys.readouterr()
arg = json.loads(out)
assert arg["ansible_user"] == username
def run(self):
if self.finished:
return
self.process = subprocess.Popen(
[str(self.path)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True
)
LOG.info("Run %s. Pid %d", self.path, self.process.pid)
self.process.wait()
logmethod = LOG.info if self.process.returncode == os.EX_OK \
else LOG.warning
logmethod("%s has been finished. Exit code %s",
self.path, self.process.returncode)
self.stdout = self.process.stdout.read().decode("utf-8")
self.stderr = self.process.stderr.read().decode("utf-8")
if self.process.returncode != os.EX_OK:
raise RuntimeError(
"Program {0} has been finished with exit code {1}",
self.path, self.process.returncode)
def get_package_version(prefix, connection, package_name):
command = "dpkg-query --showformat='${Version}' --show %s" % shlex.quote(
package_name)
result = await connection.run(command)
if result.exit_status != os.EX_OK:
click.echo(
"{0}package (failed {1}): {2} - {3}".format(
prefix, result.exit_status, package_name, result.stderr.strip()
)
)
else:
click.echo(
"{0}package (ok): {1}=={2}".format(
prefix, package_name, result.stdout.strip()
)
)
def call(self, cmd, **kwargs):
print('Running "{}"'.format(cmd), file=sys.stderr)
expect = kwargs.pop("expect", [dict(return_codes=[os.EX_OK], stdout=None, stderr=None)])
process = subprocess.Popen(cmd, stdin=kwargs.get("stdin", subprocess.PIPE), stdout=subprocess.PIPE,
stderr=subprocess.PIPE, **kwargs)
out, err = process.communicate()
return_code = process.poll()
out = out.decode(sys.stdin.encoding)
err = err.decode(sys.stdin.encoding)
def match(return_code, out, err, expected):
exit_ok = return_code in expected["return_codes"]
stdout_ok = re.search(expected.get("stdout") or "", out)
stderr_ok = re.search(expected.get("stderr") or "", err)
return exit_ok and stdout_ok and stderr_ok
if not any(match(return_code, out, err, exp) for exp in expect):
print(err)
e = subprocess.CalledProcessError(return_code, cmd, output=out)
e.stdout, e.stderr = out, err
raise e
return self.SubprocessResult(out, err, return_code)
def grep(args):
filter_args = dict(logGroupName=args.log_group)
if args.log_stream:
filter_args.update(logStreamNames=[args.log_stream])
if args.pattern:
filter_args.update(filterPattern=args.pattern)
if args.start_time:
filter_args.update(startTime=int(timestamp(args.start_time) * 1000))
if args.end_time:
filter_args.update(endTime=int(timestamp(args.end_time) * 1000))
num_results = 0
while True:
for event in paginate(clients.logs.get_paginator("filter_log_events"), **filter_args):
if "timestamp" not in event or "message" not in event:
continue
print(str(Timestamp(event["timestamp"])), event["message"])
num_results += 1
if args.follow:
time.sleep(1)
else:
return SystemExit(os.EX_OK if num_results > 0 else os.EX_DATAERR)
def build(config, docs, **kwargs):
if not config.pubdir:
return ERR_NEEDPUBDIR + "to --build"
ready, error = builddir_setup(config)
if not ready:
return error
ready, error = prepare_docs_build_mode(config, docs)
if not ready:
return error
buildsuccess, results = docbuild(config, docs, **kwargs)
for x, (buildcode, source) in enumerate(results, 1):
if buildcode:
logger.info("success (%d of %d) available in %s",
x, len(results), source.working.dirname)
else:
logger.info("FAILURE (%d of %d) available in %s",
x, len(results), source.working.dirname)
if buildsuccess:
return os.EX_OK
else:
return "Build failed, see logging output in %s." % (config.builddir,)
def publish(config, docs, **kwargs):
config.build = True
result = build(config, docs, **kwargs)
if result != os.EX_OK:
return result
for x, source in enumerate(docs, 1):
logger.info("Publishing (%d of %d) to %s.",
x, len(docs), source.output.dirname)
# -- swapdirs must raise an error if there are problems
#
swapdirs(source.working.dirname, source.output.dirname)
if os.path.isdir(source.working.dirname):
logger.debug("%s removing old directory %s",
source.stem, source.working.dirname)
shutil.rmtree(source.working.dirname)
workingdirs = list(set([x.dtworkingdir for x in docs]))
workingdirs.append(config.builddir)
post_publish_cleanup(workingdirs)
return os.EX_OK
def test_summary_longnames(self):
c = self.config
names = self.publishDocumentsWithLongNames(5)
stdout = io.StringIO()
result = tldp.driver.summary(c, file=stdout)
self.assertEqual(result, os.EX_OK)
stdout.seek(0)
data = stdout.read()
self.assertTrue('and 4 more' in data)
c.verbose = True
stdout = io.StringIO()
result = tldp.driver.summary(c, file=stdout)
self.assertEqual(result, os.EX_OK)
stdout.seek(0)
data = stdout.read()
for name in names:
self.assertTrue(name in data)
def test_run(self):
c = self.config
ex = example.ex_linuxdoc
self.add_published('Published-HOWTO', ex)
self.add_new('New-HOWTO', ex)
self.add_stale('Stale-HOWTO', ex)
self.add_orphan('Orphan-HOWTO', ex)
self.add_broken('Broken-HOWTO', ex)
fullpath = opj(self.tempdir, 'sources', 'New-HOWTO.sgml')
argv = self.argv
argv.extend(['--publish', 'stale', 'Orphan-HOWTO', fullpath])
exitcode = tldp.driver.run(argv)
self.assertEqual(exitcode, os.EX_OK)
inv = tldp.inventory.Inventory(c.pubdir, c.sourcedir)
self.assertEqual(4, len(inv.published.keys()))
self.assertEqual(1, len(inv.broken.keys()))
def is_successful(self):
"""Interpret the result of the test case.
It allows getting the result of TestCase. It completes run()
which only returns the execution status.
It can be overriden if checking result is not suitable.
Returns:
TestCase.EX_OK if result is 'PASS'.
TestCase.EX_TESTCASE_FAILED otherwise.
"""
try:
assert self.criteria
assert self.result is not None
if (not isinstance(self.result, str) and
not isinstance(self.criteria, str)):
if self.result >= self.criteria:
return TestCase.EX_OK
else:
# Backward compatibility
# It must be removed as soon as TestCase subclasses
# stop setting result = 'PASS' or 'FAIL'.
# In this case criteria is unread.
self.__logger.warning(
"Please update result which must be an int!")
if self.result == 'PASS':
return TestCase.EX_OK
except AssertionError:
self.__logger.error("Please run test before checking the results")
return TestCase.EX_TESTCASE_FAILED
def push_to_db(self):
"""Push the results of the test case to the DB.
It allows publishing the results and to check the status.
It could be overriden if the common implementation is not
suitable. The following attributes must be set before pushing
the results to DB:
* project_name,
* case_name,
* result,
* start_time,
* stop_time.
Returns:
TestCase.EX_OK if results were pushed to DB.
TestCase.EX_PUSH_TO_DB_ERROR otherwise.
"""
try:
assert self.project_name
assert self.case_name
assert self.start_time
assert self.stop_time
pub_result = 'PASS' if self.is_successful(
) == TestCase.EX_OK else 'FAIL'
if ft_utils.push_results_to_db(
self.project_name, self.case_name, self.start_time,
self.stop_time, pub_result, self.details):
self.__logger.info(
"The results were successfully pushed to DB")
return TestCase.EX_OK
else:
self.__logger.error("The results cannot be pushed to DB")
return TestCase.EX_PUSH_TO_DB_ERROR
except Exception: # pylint: disable=broad-except
self.__logger.exception("The results cannot be pushed to DB")
return TestCase.EX_PUSH_TO_DB_ERROR
def _run(self, args): # pylint: disable=no-self-use
""" The built_in function to run a test case """
case_name = args.get('testcase')
self._update_logging_ini(args.get('task_id'))
try:
cmd = "run_tests -t {}".format(case_name)
runner = ft_utils.execute_command(cmd)
except Exception: # pylint: disable=broad-except
result = 'FAIL'
LOGGER.exception("Running test case %s failed!", case_name)
if runner == os.EX_OK:
result = 'PASS'
else:
result = 'FAIL'
env_info = {
'installer': CONST.__getattribute__('INSTALLER_TYPE'),
'scenario': CONST.__getattribute__('DEPLOY_SCENARIO'),
'build_tag': CONST.__getattribute__('BUILD_TAG'),
'ci_loop': CONST.__getattribute__('CI_LOOP')
}
result = {
'task_id': args.get('task_id'),
'testcase': case_name,
'env_info': env_info,
'result': result
}
return {'result': result}
def __init__(self):
self.executed_test_cases = {}
self.overall_result = Result.EX_OK
self.clean_flag = True
self.report_flag = False
self._tiers = tb.TierBuilder(
CONST.__getattribute__('INSTALLER_TYPE'),
CONST.__getattribute__('DEPLOY_SCENARIO'),
pkg_resources.resource_filename('functest', 'ci/testcases.yaml'))
def main(self, **kwargs):
"""Entry point of class Runner"""
if 'noclean' in kwargs:
self.clean_flag = not kwargs['noclean']
if 'report' in kwargs:
self.report_flag = kwargs['report']
try:
if 'test' in kwargs:
self.source_rc_file()
LOGGER.debug("Test args: %s", kwargs['test'])
if self._tiers.get_tier(kwargs['test']):
self.run_tier(self._tiers.get_tier(kwargs['test']))
elif self._tiers.get_test(kwargs['test']):
result = self.run_test(
self._tiers.get_test(kwargs['test']))
if result != testcase.TestCase.EX_OK:
LOGGER.error("The test case '%s' failed.",
kwargs['test'])
self.overall_result = Result.EX_ERROR
elif kwargs['test'] == "all":
self.run_all()
else:
LOGGER.error("Unknown test case or tier '%s', or not "
"supported by the given scenario '%s'.",
kwargs['test'],
CONST.__getattribute__('DEPLOY_SCENARIO'))
LOGGER.debug("Available tiers are:\n\n%s",
self._tiers)
return Result.EX_ERROR
else:
self.run_all()
except BlockingTestFailed:
pass
except Exception: # pylint: disable=broad-except
LOGGER.exception("Failures when running testcase(s)")
self.overall_result = Result.EX_ERROR
if not self._tiers.get_test(kwargs['test']):
self.summary(self._tiers.get_tier(kwargs['test']))
LOGGER.info("Execution exit value: %s", self.overall_result)
return self.overall_result
def summary(self, tier=None):
"""To generate functest report showing the overall results"""
msg = prettytable.PrettyTable(
header_style='upper', padding_width=5,
field_names=['env var', 'value'])
for env_var in ['INSTALLER_TYPE', 'DEPLOY_SCENARIO', 'BUILD_TAG',
'CI_LOOP']:
msg.add_row([env_var, CONST.__getattribute__(env_var)])
LOGGER.info("Deployment description:\n\n%s\n", msg)
msg = prettytable.PrettyTable(
header_style='upper', padding_width=5,
field_names=['test case', 'project', 'tier',
'duration', 'result'])
tiers = [tier] if tier else self._tiers.get_tiers()
for each_tier in tiers:
for test in each_tier.get_tests():
try:
test_case = self.executed_test_cases[test.get_name()]
except KeyError:
msg.add_row([test.get_name(), test.get_project(),
each_tier.get_name(), "00:00", "SKIP"])
else:
result = 'PASS' if(test_case.is_successful(
) == test_case.EX_OK) else 'FAIL'
msg.add_row(
[test_case.case_name, test_case.project_name,
self._tiers.get_tier_name(test_case.case_name),
test_case.get_duration(), result])
for test in each_tier.get_skipped_test():
msg.add_row([test.get_name(), test.get_project(),
each_tier.get_name(), "00:00", "SKIP"])
LOGGER.info("FUNCTEST REPORT:\n\n%s\n", msg)
def main():
version_info = 'pyrate version ' + __version__
try:
if os.environ.get('TESTOLDIMPORTS'):
raise ImportError()
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('build_file', nargs = '?', default = 'build.py',
help = 'name of the input file - default: build.py')
parser.add_argument('-V', '--version', action = 'version', version = version_info)
parser.add_argument('-M', '--makefile', action = 'store_true', help = 'enable makefile mode')
parser.add_argument('-o', '--output', nargs = 1, default = None,
help = 'name of output build file')
args = parser.parse_args()
if args.output:
args.output = args.output[0]
bfn = args.build_file
except ImportError:
optparse = __import__('optparse')
parser = optparse.OptionParser(usage = 'pyrate [options] build_file')
parser.add_option('-V', '--version', action='store_true', help = 'display version')
parser.add_option('-M', '--makefile', action = 'store_true', help = 'enable makefile mode')
parser.add_option('-o', '--output', default = None,
help = 'name of output build file', dest='output')
(args, posargs) = parser.parse_args()
if len(posargs) > 1:
sys.stderr.write('too many build_file arguments provided! %s\n' % repr(posargs))
return os.EX_USAGE
elif not posargs:
posargs = ['build.py']
bfn = posargs[0]
if args.version:
sys.stderr.write(version_info + '\n')
sys.exit(os.EX_OK)
generate_build_file(bfn, args.output, args.makefile)
################################################################################
# Externals + helper functions
################################################################################
def test_option_help(self):
"""test --help option"""
cmd, output = runCmdOutput(['--help'])
self.assertEqual(cmd.returncode, os.EX_OK)
def test_option_version(self):
"""test --version option"""
cmd, output = runCmdOutput(['--version'])
self.assertEqual(cmd.returncode, os.EX_OK)
def test_logging_config(self):
"""test logging config from file or default"""
topdir = os.path.dirname(os.path.dirname(__file__))
# logging config from default
os.system('rm %s/logging.conf' % topdir)
cmd, output = runCmdOutput(['-p', '7788'])
self.assertEqual(cmd.returncode, os.EX_OK)
# logging config from file
os.system('cp %s/logging.conf.sample %s/logging.conf' %
(topdir, topdir))
cmd, output = runCmdOutput(['-p', '7788'])
self.assertEqual(cmd.returncode, os.EX_OK)
def parse_args():
to_parse, cmd, capturer = split_args_to_parse()
global_argparser = create_argparser()
args = global_argparser.parse_args(to_parse)
if capturer:
return args, cmd, capturer
else:
global_argparser.print_help()
sys.exit(os.EX_OK)
def _instancecheck_impl(self, value, info: Info) -> InfoMsg:
if not isinstance(value, str):
return info.errormsg(self)
is_valid = True
if os.path.exists(value):
if os.path.isdir(value) and os.access(os.path.abspath(value), os.W_OK)\
and (self.constraint is None or self.constraint(value)):
return info.wrap(True)
return info.errormsg(self)
abs_name = os.path.abspath(value)
dir_name = os.path.dirname(abs_name)
if os.path.exists(dir_name) and os.access(dir_name, os.EX_OK) and os.access(dir_name, os.W_OK) \
and (self.constraint is None or self.constraint(value)):
return info.wrap(True)
return info.errormsg(self)
def main():
argument_parser = get_argument_parser()
options = argument_parser.parse_args()
if options.list_checkers:
list_checkers()
return
program = Program(options)
program.check()
program.print_issues()
errors = [e for e in program.issues
if e.level == 'error' or e.level == 'syntax-error']
sys.exit(os.EX_OK if len(errors) == 0 else os.EX_DATAERR)
def check_if_run():
pid = read_file(PIDFILE)
current_pid = os.getpid()
if pid is None:
return
if int(pid) > 0 and int(pid) != current_pid:
if os.path.exists("/proc/%d" % int(pid)):
log("[%s] Already running - keepalive done." % time.ctime())
sys.exit(os.EX_OK)
def mocked_plugin():
patch = unittest.mock.patch("decapod_common.plugins.get_playbook_plugins")
with patch as ptch:
plugin = unittest.mock.MagicMock()
required_mock = unittest.mock.MagicMock()
required_mock.pid = 100
required_mock.returncode = os.EX_OK
plugin.execute.return_value.__enter__.return_value = required_mock
ptch.return_value.get.return_value.return_value = plugin
yield required_mock