def pytest_sessionstart(session):
# Check options
session.config.ctlogger = loggers.module_logger("conftest")
session.config.ctlogger.debug("Session start...")
if not (session.config.option.setup_scope in ["session", "module", "class", "function"]):
session.config.ctlogger.error("Incorrect --setup_scope option.")
pytest.exit("Incorrect --setup_scope option.")
if not (session.config.option.call_check in ["none", "complete", "fast", "sanity_check_only"]):
session.config.ctlogger.error("Incorrect --call_check option.")
pytest.exit("Incorrect --call_check option.")
if not (session.config.option.teardown_check in ["none", "complete", "fast", "sanity_check_only"]):
session.config.ctlogger.error("Incorrect --teardown_check option.")
pytest.exit("Incorrect --teardown_check option.")
if not (session.config.option.fail_ctrl in ["stop", "restart", "ignore"]):
session.config.ctlogger.error("Incorrect --fail_ctrl option.")
pytest.exit("Incorrect --fail_ctrl option.")
# Define environment
session.config.env = common3.Environment(session.config.option)
python类exit()的实例源码
def check(self):
"""Check if switch is operational using waiton method.
Notes:
This mandatory method for all environment classes.
"""
if not self.status:
self.class_logger.info("Skip switch id:%s(%s) check because it's has Off status." % (self.id, self.name))
return
status = self.waiton()
# Verify Ports table is not empty
if self.ui.get_table_ports() == []:
if self.opts.fail_ctrl == 'stop':
self.class_logger.debug("Exit switch check. Ports table is empty!")
pytest.exit('Ports table is empty!')
else:
self.class_logger.debug("Fail switch check. Ports table is empty!")
pytest.fail('Ports table is empty!')
return status
def env_init(self, request):
"""Validate command line options.
Args:
request(pytest.request): pytest request
Returns:
testlib.common3.Environment: Environment instance
"""
if request.config.option.setup_scope not in {"session", "module", "class", "function"}:
request.config.ctlogger.error("Incorrect --setup_scope option.")
pytest.exit("Incorrect --setup_scope option.")
if request.config.option.call_check not in {"none", "complete", "fast", "sanity_check_only"}:
request.config.ctlogger.error("Incorrect --call_check option.")
pytest.exit("Incorrect --call_check option.")
if request.config.option.teardown_check not in {"none", "complete", "fast", "sanity_check_only"}:
request.config.ctlogger.error("Incorrect --teardown_check option.")
pytest.exit("Incorrect --teardown_check option.")
if request.config.option.fail_ctrl not in {"stop", "restart", "ignore"}:
request.config.ctlogger.error("Incorrect --fail_ctrl option.")
pytest.exit("Incorrect --fail_ctrl option.")
request.config.env.testenv_checkstatus = False
return request.config.env
def pytest_configure(config):
"""Hook to check steps consistency."""
if config.option.disable_steps_checker:
config.warn('P1', 'Step consistency checker is disabled!')
return
errors = []
for step_cls in _get_step_classes():
for attr_name in dir(step_cls):
if attr_name not in STEPS:
continue
step_func = six.get_unbound_function(getattr(step_cls, attr_name))
step_func = utils.get_unwrapped_func(step_func)
validator = StepValidator(step_func)
errors.extend(validator.validate())
if errors:
pytest.exit('Some steps are not consistent!\n' + '\n'.join(errors))
def exit_gracefully(signum, frame):
raise pytest.exit('Interrupting from SIGTERM')
def _stop_if_necessary(self):
try:
exc, msg, tb = self._errors.get(False)
traceback.print_exception(exc, msg, tb)
sys.stderr.flush()
if not self.ignore_errors:
pytest.exit(msg)
except queue.Empty:
pass
def get(self, init_start=False, retry_count=5):
"""Checking OVS Controller.
Args:
init_start(bool): Flag to start OVS Controller
retry_count(int): Retry attempts count
"""
first_loop = True
# Try to restart if necessary at least several times
retry = 1
# If fail_ctrl != "restart", restart retries won't be performed
if self.opts.fail_ctrl != "restart":
retry_count = 1
while retry <= retry_count:
try:
if first_loop:
if init_start:
self.start()
else:
self.waiton()
else:
self.restart()
retry = retry_count + 1
except Exception:
self.class_logger.info("Error while checking Ovs Controller %s:%s..." % (self.ipaddr, self.port))
retry += 1
first_loop = False
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback_message = traceback.format_exception(exc_type, exc_value, exc_traceback)
message = "Error while checking Ovs Controller %s:%s:\n%s" % (self.ipaddr, self.port, "".join(traceback_message))
sys.stderr.write(message)
sys.stderr.flush()
self.class_logger.log(loggers.levels['ERROR'], message)
if retry >= retry_count + 1:
sys.stderr.write("Could not restart Ovs Controller for %s times. Something goes wrong \n" % (retry_count, ))
sys.stderr.flush()
if self.opts.fail_ctrl == "stop":
pytest.exit(message)
else:
pytest.fail(message)
def get(self, init_start=False, retry_count=1):
"""Get or start linux host instance.
Args:
init_start(bool): Perform switch start operation or not
retry_count(int): Number of retries to start(restart) linux host
Returns:
None or raise an exception.
Notes:
Also self.opts.fail_ctrl attribute affects logic of this method.
fail_ctrl is set in py.test command line options (read py.test --help for more information).
"""
# If fail_ctrl != "restart", restart retries won't be performed
# as restart is not implemented for lhosts, retries makes no sense.
# if self.opts.fail_ctrl != "restart":
# retry_count = 1
try:
if init_start:
self.start()
else:
self.waiton()
except KeyboardInterrupt as ex:
message = "KeyboardInterrupt while checking device {0}({1})...".format(
self.name, self.ipaddr)
self.class_logger.info(message)
self.sanitize()
pytest.exit(message)
except Exception:
self.class_logger.error(
"Error while checking device %s(%s)...", self.name, self.ipaddr)
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback_message = traceback.format_exception(exc_type, exc_value, exc_traceback)
message = "Error while checking device {0}({1}):\n{2}".format(
self.name, self.ipaddr, "".join(traceback_message))
sys.stderr.write(message)
sys.stderr.flush()
pytest.fail(message)
def pytest_configure(config):
"""Registering plugin.
"""
if config.option.setup:
config.pluginmanager.register(OnsEnvPlugin(), "_onsenv")
else:
config.ctlogger.error("SETUP")
pytest.exit("SETUP")
def stop(self):
"""Stop the LogCatcher instance and perform resource cleanup."""
self._termination_flag.set()
while self._logger_thread.is_alive():
self._logger_thread.join(timeout=0.5)
log.info("Waiting for LogCatcher thread to exit")
log.info("LogCatcher thread has terminated, bye!")
def start(self):
"""Start a subprocess
This method makes python actually spawn the subprocess and wait for it
to finish initializing.
"""
self._start_subprocess()
self._register_stdout_stderr_to_logcatcher()
if not self._wait_for_subprocess_to_finish_init():
self.stop()
pytest.exit("Failed to start `{}` process".format(self.id))
def stop(self):
"""Stop ManagedSubprocess instance and perform a cleanup
This method makes sure that there are no child processes left after
the object destruction finalizes. In case when a process cannot stop
on it's own, it's forced to using SIGTERM/SIGKILL.
"""
self._process.poll()
if self._process.returncode is not None:
msg_fmt = "`%s` process has already terminated with code `%s`"
pytest.exit(msg_fmt % (self.id, self._process.returncode))
return
log.info("Send SIGINT to `%s` session leader", self.id)
self._process.send_signal(signal.SIGINT)
try:
self._process.wait(self._EXIT_TIMEOUT / 2.0)
except subprocess.TimeoutExpired:
log.info("Send SIGTERM to `%s` session leader", self.id)
self._process.send_signal(signal.SIGTERM)
try:
self._process.wait(self._EXIT_TIMEOUT / 2.0)
except subprocess.TimeoutExpired:
log.info("Send SIGKILL to all `%s` processess", self.id)
os.killpg(os.getpgid(self._process.pid), signal.SIGKILL)
log.info("wait() for `%s` session leader to die", self.id)
self._process.wait()
log.info("`%s` session leader has terminated", self.id)
def test_exit_propagates(self, testdir):
try:
testdir.runitem("""
import pytest
def test_func():
raise pytest.exit.Exception()
""")
except pytest.exit.Exception:
pass
else:
pytest.fail("did not raise")
def test_pytest_exit():
try:
pytest.exit("hello")
except pytest.exit.Exception:
excinfo = py.code.ExceptionInfo()
assert excinfo.errisinstance(KeyboardInterrupt)
def pytest_collection_modifyitems(session, items):
"""Add marker to test name, if test marked with `idempotent_id` marker.
If optional kwargs passed - test parameters should be a
superset of this kwargs to mark be applied.
Also kwargs can be passed as `params` argument.
"""
ids = defaultdict(list)
for item in items:
test_id = get_item_id(item)
ids[test_id].append(item)
if test_id is not None:
item.name += '[id-{}]'.format(test_id)
if session.config.option.check_idempotent_id:
errors = []
without_id = ids.pop(None, [])
if without_id:
errors += ["Tests without idempotent_id:"]
errors += [' ' + x.nodeid for x in without_id]
for test_id, items in ids.items():
if len(items) > 1:
errors += ["Single idempotent_id for many cases:"]
errors += [' ' + x.nodeid for x in items]
if errors:
print('')
print('\n'.join(errors))
pytest.exit('Errors with idempotent_id')
def pytest_collection_modifyitems(config, items):
"""Hook to detect forbidden calls inside test."""
if config.option.disable_steps_checker:
config.warn('P1', 'Permitted calls checker is disabled!')
return
errors = []
for item in items:
permitted_calls = PERMITTED_CALLS + STEPS + item.funcargnames
validator = TestValidator(item.function, permitted_calls)
errors.extend(validator.validate())
if errors:
pytest.exit("Only steps and fixtures must be called in test!\n" +
'\n'.join(errors))
def pytest_collection_modifyitems(config, items):
"""Hook to prevent run destructive tests without snapshot_name."""
stop_destructive = (config.option.snapshot_name is None and
not config.option.force_destructive)
for item in items:
if item.get_marker(DESTRUCTIVE) and stop_destructive:
pytest.exit("You try to start destructive tests without passing "
"--snapshot-name for cloud reverting. Such tests can "
"break your cloud. To run destructive tests without "
"--snapshot-name you should pass --force-destructive "
"argument.")
def pytest_runtest_call():
"""
This function must be run after setup_module(), it does standarized post
setup routines. It is only being used for the 'topology-only' option.
"""
# pylint: disable=E1101
# Trust me, 'config' exists.
if pytest.config.getoption('--topology-only'):
tgen = get_topogen()
if tgen is not None:
# Allow user to play with the setup.
tgen.mininet_cli()
pytest.exit('the topology executed successfully')
def pytest_configure(config):
"Assert that the environment is correctly configured."
if not diagnose_env():
pytest.exit('enviroment has errors, please read the logs')
def test_history_id():
"""
>>> allure_report = getfixture('allure_report')
>>> assert_that(allure_report,
... has_test_case('test_history_id',
... has_history_id()
... )
... )
"""
pass # pytest.exit("Just test")
def _connect_to_switch(self, prompt, timeout=20):
"""SSH connect to switch and wait until prompt string appeared.
Args:
prompt(str): expected CLI prompt.
timeout(int): connection timeout.
Returns:
None
Examples::
self._connect_to_switches(sw_keys=1, prompt="Switch ")
"""
cli_start_path = ''
self.suite_logger.debug("Login on switch with login: {0} and expected prompt is: {1}".format(self.login, prompt))
self.conn.login(self.login, self.passw, timeout=self.timeout)
self.suite_logger.debug("Create Shell")
self.conn.open_shell(raw_output=True)
self.conn.shell.settimeout(self.timeout)
# lxc: run command "python main.py"
if self.devtype == 'lxc':
self.suite_logger.debug("Launched CLI on LXC")
if os.path.exists(os.path.join(self.build_path, self.img_path, 'main.py')) is True:
cli_start_path = os.path.join(self.build_path, self.img_path)
self.conn.shell_command('cd %s && python main.py -a %s -p %s'
% (cli_start_path, self.ipaddr, self.xmlrpcport), timeout=5, ret_code=False, quiet=True)
else:
self.suite_logger.error("Path to CLI image does not exist: %s" % (os.path.join(cli_start_path, 'main.py')))
pytest.exit("Path to CLI image does not exist: %s" % (os.path.join(cli_start_path, 'main.py')))
else:
self.suite_logger.debug("Waiting for CLI on Real switch")
alter = []
# Add one or few expected prompt(s) and action(s) to alternatives list
if isinstance(prompt, str):
prompt = [prompt]
for single_prompt in prompt:
alter.append((single_prompt, None, True, False))
iterations = 12
for i in range(iterations):
time.sleep(10)
login = self.conn.action_on_connect(self.conn.shell, alternatives=alter, timeout=timeout, is_shell=self.is_shell)
if any(login.find(str(single_prompt)) != -1 for single_prompt in prompt):
break
else:
self.suite_logger.debug("Waiting, current prompt is {0}".format(login))
if i == iterations:
login = self.conn.action_on_expect(self.conn.shell, alternatives=alter, timeout=timeout, is_shell=self.is_shell)
def waiton(self, timeout=DEFAULT_SERVER_WAIT_ON_TIMEOUT):
"""Wait until device is fully operational.
Args:
timeout(int): Wait timeout
Raises:
SwitchException: device doesn't response
Returns:
dict: Status dictionary from probe method or raise an exception.
"""
status = None
message = "Waiting until device {0}({1}) is up.".format(self.name, self.ipaddr)
self.class_logger.info(message)
stop_flag = False
end_time = time.time() + timeout
while not stop_flag:
if loggers.LOG_STREAM:
sys.stdout.write(".")
sys.stdout.flush()
if time.time() < end_time:
# While time isn't elapsed continue probing switch.
try:
status = self.probe()
except KeyboardInterrupt:
message = "KeyboardInterrupt while checking switch {0}({1})...".format(
self.name, self.ipaddr)
self.class_logger.info(message)
self.sanitize()
pytest.exit(message)
if status["isup"]:
stop_flag = True
else:
# Time is elapsed.
port = self._get_port_for_probe()
message = "Timeout exceeded. IP address {0} port {1} doesn't respond.".format(
self.ipaddr, port)
self.class_logger.warning(message)
raise Exception(message)
if not stop_flag:
time.sleep(0.75)
return status
def waiton(self, timeout=90):
"""Wait until switch if fully operational.
Args:
timeout(int): Wait timeout
Raises:
SwitchException: device doesn't response
Returns:
dict: Status dictionary from probe method or raise an exception.
"""
status = None
message = "Waiting until switch %s(%s) is up." % (self.name, self.ipaddr)
self.class_logger.info(message)
stop_flag = False
end_time = time.time() + timeout
while not stop_flag:
if loggers.LOG_STREAM:
sys.stdout.write(".")
sys.stdout.flush()
if time.time() < end_time:
# While time isn't elapsed continue probing switch.
try:
status = self.probe()
except KeyboardInterrupt:
message = "KeyboardInterrupt while checking switch %s(%s)..." % (self.name, self.ipaddr)
self.class_logger.info(message)
self.sanitize()
pytest.exit(message)
if status["isup"] and status["type"] == "switchpp":
stop_flag = True
if status['prop'] == {}:
# If type == switchpp but prop == empty dict then Platform table return incorrect data.
message = "Found running switchpp on %s but Platform data is corrupted." % (self.ipaddr, )
self.class_logger.warning(message)
raise SwitchException(message)
self.class_logger.info("Switch instance on %s(%s) is OK." % (self.name, self.ipaddr))
else:
# Time is elapsed.
if status["isup"] and status["type"] != "switchpp":
message = ("Port %s on host %s is opened but doesn't response queries." +
" %s Check your environment!") % (self.port, self.ipaddr, self.waiton_err_message)
else:
port = self._get_port_for_probe()
message = "Timeout exceeded. IP address %s port %s doesn't respond" % (self.ipaddr, port)
self.class_logger.warning(message)
raise SwitchException(message)
if not stop_flag:
time.sleep(0.75)
return status
def get(self, init_start=False, retry_count=7):
"""Get or start switch instance.
Args:
init_start(bool): Perform switch start operation or not
retry_count(int): Number of retries to start(restart) switch
Returns:
None or raise an exception.
Notes:
Also self.opts.fail_ctrl attribute affects logic of this method.
fail_ctrl is set in py.test command line options (read py.test --help for more information).
"""
# If fail_ctrl != "restart", restart retries won't be performed
if self.opts.fail_ctrl != "restart":
retry_count = 1
for retry in range(retry_count):
try:
if retry == 0:
if init_start:
self.start()
else:
self.waiton()
else:
self.restart(mode=self.default_restart_type)
break
except KeyboardInterrupt:
message = "KeyboardInterrupt while checking switch %s(%s)..." % (self.name, self.ipaddr)
self.class_logger.info(message)
self.sanitize()
pytest.exit(message)
except Exception:
self.class_logger.warning("Error while checking switch %s(%s)..." % (self.name, self.ipaddr))
retry += 1
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback_message = traceback.format_exception(exc_type, exc_value, exc_traceback)
message = "Error while checking switch %s(%s):\n%s" % (self.name, self.ipaddr, "".join(traceback_message))
sys.stderr.write(message)
sys.stderr.flush()
self.class_logger.error(message)
if retry > 4:
self.class_logger.warning(
"Could not complete switch start method for the fourth time. Trying to "
"reset the DB...")
self.db_corruption = True
if retry >= retry_count + 1:
message = "Could not complete start switch method after {0} retries. Something " \
"went wrong...\n".format(retry_count)
sys.stderr.write(message)
sys.stderr.flush()
self.class_logger.error(message)
if self.opts.fail_ctrl != "ignore":
pytest.exit(message)
else:
pytest.fail(message)