def test_k8s_installed_default(self, underlay, k8s_actions):
"""Test for deploying an k8s environment and check it
pytest.mark: k8s_installed_default
Scenario:
1. Install k8s.
2. Check number of nodes.
3. Basic check of running containers on nodes.
4. Check requirement base settings.
5. Create nginx pod.
6. Check created pod is reached
7. Delete pod.
"""
k8s_actions.install_k8s()
k8sclient = k8s_actions.api
self.check_number_kube_nodes(underlay, k8sclient)
self.check_list_required_images(
underlay, required_images=self.base_images)
self.check_etcd_health(underlay)
nginx = self.get_nginx_spec()
pod = k8s_actions.check_pod_create(body=nginx)
self.check_nginx_pod_is_reached(underlay, pod.status.pod_ip)
k8s_actions.check_pod_delete(pod)
python类mark()的实例源码
def pytest_collection_modifyitems(self, session, config, items):
env = config.getoption('--env')
# TODO: Really naive, we need add a batch call. Until then,
# this is going to be unworkable for anyone outside the
# Toronto office...
for item in items:
for mark in self.get_marks(env=env, name=item.name):
name = mark['name']
args = mark.get('args', [])
kwargs = mark.get('kwargs', {})
pytest.log.info(
"Applying {} mark to {}".format(name, item.name)
)
mark = getattr(pytest.mark, name)(*args, **kwargs)
item.add_marker(mark)
# Proceed with the collection
yield
def skip_by_skin_names(request, skin):
""" Skip by skin name.
We support validation for multi skin applications providing the best
page object class match.
We expect many failures we want to avoid because many tests will fail
because the related page object implementation still not exists.
If you want you can omit a test execution for a given skin adding a
a ```@pytest.mark.skip_skins(['skin2'])``` decorator on your tests.
Tests marked with a skin2 skip will be executed for all skins
except for skin2.
See http://bit.ly/2dYnOSv for further info.
"""
if request.node.get_marker('skip_skins'):
if skin in request.node.get_marker('skip_skins').args[0]:
pytest.skip('skipped on this skin: {}'.format(skin))
def __call__(self, fun):
marker_name = self.marker_name or fun.__name__
@pytest.fixture()
@wraps(fun)
def _inner(request, *args, **kwargs):
marker = request.node.get_marker(marker_name)
print(request.node)
return fun(request, *args, **dict(marker.kwargs, **kwargs))
def options(*args, **kwargs):
return getattr(pytest.mark, marker_name)(*args, **kwargs)
_inner.options = options
_inner.__wrapped__ = fun
return _inner
def test_query_one():
db = SqliteStore(config=MockConfig())
# Due to pytest.mark, this test runs after the insert,
# so we should have this RegisteredCidr
cidr = db.query_one(TEST_CIDR)
assert cidr.cidr == TEST_CIDR
assert cidr.description == TEST_DESCRIPTION
assert cidr.location == TEST_LOCATION
assert cidr.owner == TEST_OWNER
assert cidr.expiration == TEST_EXPIRATION
fake_cidr = db.query_one("FAKE CIDR")
assert fake_cidr is None
def pytest_namespace():
return {'mark': MarkGenerator()}
def pytest_addoption(parser):
group = parser.getgroup("general")
group._addoption(
'-k',
action="store", dest="keyword", default='', metavar="EXPRESSION",
help="only run tests which match the given substring expression. "
"An expression is a python evaluatable expression "
"where all names are substring-matched against test names "
"and their parent classes. Example: -k 'test_method or test_"
"other' matches all test functions and classes whose name "
"contains 'test_method' or 'test_other'. "
"Additionally keywords are matched to classes and functions "
"containing extra names in their 'extra_keyword_matches' set, "
"as well as functions which have names assigned directly to them."
)
group._addoption(
"-m",
action="store", dest="markexpr", default="", metavar="MARKEXPR",
help="only run tests matching given mark expression. "
"example: -m 'mark1 and not mark2'."
)
group.addoption(
"--markers", action="store_true",
help="show markers (builtin, plugin and per-project ones)."
)
parser.addini("markers", "markers for test functions", 'linelist')
def pytest_cmdline_main(config):
import _pytest.config
if config.option.markers:
config._do_configure()
tw = _pytest.config.create_terminal_writer(config)
for line in config.getini("markers"):
name, rest = line.split(":", 1)
tw.write("@pytest.mark.%s:" % name, bold=True)
tw.line(rest)
tw.line()
config._ensure_unconfigure()
return 0
def pytest_configure(config):
import pytest
if config.option.strict:
pytest.mark._config = config
def __call__(self, *args, **kwargs):
""" if passed a single callable argument: decorate it with mark info.
otherwise add *args/**kwargs in-place to mark information. """
if args and not kwargs:
func = args[0]
is_class = inspect.isclass(func)
if len(args) == 1 and (istestfunc(func) or is_class):
if is_class:
if hasattr(func, 'pytestmark'):
mark_list = func.pytestmark
if not isinstance(mark_list, list):
mark_list = [mark_list]
# always work on a copy to avoid updating pytestmark
# from a superclass by accident
mark_list = mark_list + [self]
func.pytestmark = mark_list
else:
func.pytestmark = [self]
else:
holder = getattr(func, self.name, None)
if holder is None:
holder = MarkInfo(
self.name, self.args, self.kwargs
)
setattr(func, self.name, holder)
else:
holder.add(self.args, self.kwargs)
return func
kw = self.kwargs.copy()
kw.update(kwargs)
args = self.args + args
return self.__class__(self.name, args=args, kwargs=kw)
def test_lcm_k8s_scale_up(self, hardware, underlay, k8scluster):
"""Test for scale an k8s environment
pytest.mark: k8s_installed_default
Require:
- already installed k8s cluster with node roles 'k8s'
- fuel-devops environment with additional node roles 'k8s_scale'
Scenario:
1. Check number of kube nodes match underlay nodes.
2. Check etcd health.
3. Add to 'underlay' new nodes for k8s scale
4. Run fuel-ccp installer for old+new k8s nodes
5. Check number of kube nodes match underlay nodes.
6. Check etcd health.
"""
k8sclient = k8scluster.api
self.check_number_kube_nodes(underlay, k8sclient)
self.check_etcd_health(underlay)
config_ssh_scale = hardware.get_ssh_data(
roles=[ext.NODE_ROLE.k8s_scale])
underlay.add_config_ssh(config_ssh_scale)
k8scluster.install_k8s()
self.check_number_kube_nodes(underlay, k8sclient)
self.check_etcd_health(underlay)
def test_k8s_installed_with_etcd_on_host(self, underlay, k8s_actions):
"""Test for deploying an k8s environment and check it
pytest.mark: k8s_installed_with_etcd_on_host
Scenario:
1. Install k8s with forced etcd on host.
2. Check number of nodes.
3. Basic check of running containers on nodes.
4. Check requirement base settings.
5. Create nginx pod.
6. Check created pod is reached
7. Delete pod.
"""
kube_settings = dict()
kube_settings.update(self.kube_settings)
kube_settings.update({
'etcd_deployment_type': 'host',
'kube_network_plugin': 'calico'
})
required_images = filter(
lambda x: x != kube_settings.get(
'etcd_image_repo', settings.ETCD_IMAGE_REPO),
self.custom_yaml_images)
k8s_actions.install_k8s(custom_yaml=kube_settings)
k8sclient = k8s_actions.api
self.check_number_kube_nodes(underlay, k8sclient)
self.check_list_required_images(underlay,
required_images=required_images)
self.check_etcd_health(underlay)
nginx = self.get_nginx_spec()
pod = k8s_actions.check_pod_create(body=nginx)
self.check_nginx_pod_is_reached(underlay, pod.status.pod_ip)
k8s_actions.check_pod_delete(pod)
def test_k8s_installed_with_etcd_in_container(self, underlay, k8s_actions):
"""Test for deploying an k8s environment and check it
pytest.mark: k8s_installed_with_etcd_in_container
Scenario:
1. Install k8s with forced etcd in container.
2. Check number of nodes.
3. Basic check of running containers on nodes.
4. Check requirement base settings.
5. Create nginx pod.
6. Check created pod is reached
7. Delete pod.
"""
kube_settings = dict()
kube_settings.update(self.kube_settings)
kube_settings.update({
'etcd_deployment_type': 'docker',
'kube_network_plugin': 'calico',
'etcd_image_repo': settings.ETCD_IMAGE_REPO,
'etcd_image_tag': settings.ETCD_IMAGE_TAG,
})
required_images = list(self.base_images)
required_images.append(kube_settings['etcd_image_repo'])
k8s_actions.install_k8s(custom_yaml=kube_settings)
k8sclient = k8s_actions.api
self.check_number_kube_nodes(underlay, k8sclient)
self.check_list_required_images(underlay,
required_images=required_images)
self.check_etcd_health(underlay)
nginx = self.get_nginx_spec()
pod = k8s_actions.check_pod_create(body=nginx)
self.check_nginx_pod_is_reached(underlay, pod.status.pod_ip)
k8s_actions.check_pod_delete(pod)
def test_k8s_installed_with_ready_ssh_keys(self, ssh_keys_dir,
underlay, k8s_actions):
"""Test for deploying an k8s environment and check it
pytest.mark: k8s_installed_with_ready_ssh_keys
Scenario:
1. Install k8s (with prepared ssh keys).
2. Check number of nodes.
3. Basic check of running containers on nodes.
4. Check requirement base settings.
5. Create nginx pod.
6. Check created pod is reached
7. Delete pod.
"""
add_var = {
"WORKSPACE": ssh_keys_dir
}
k8s_actions.install_k8s(env_var=add_var)
k8sclient = k8s_actions.api
self.check_number_kube_nodes(underlay, k8sclient)
self.check_list_required_images(
underlay, required_images=self.base_images)
self.check_etcd_health(underlay)
nginx = self.get_nginx_spec()
pod = k8s_actions.check_pod_create(body=nginx)
self.check_nginx_pod_is_reached(underlay, pod.status.pod_ip)
k8s_actions.check_pod_delete(pod)
def test_ccp_idempotency_with_etcd_on_host(self, config, underlay,
k8s_actions):
"""Test for deploying an k8s environment and check it
pytest.mark: k8s_installed_with_etcd_on_host
Scenario:
1. Install k8s with forced etcd on host.
2. Check that count of changes in the ansible log is more than 0
3. Re-install k8s with forced etcd on host.
4. Check that count of changes in the ansible log is 0
"""
kube_settings = dict()
kube_settings.update(self.kube_settings)
kube_settings.update({
'etcd_deployment_type': 'host',
'kube_network_plugin': 'calico'
})
result = k8s_actions.install_k8s(custom_yaml=kube_settings,
verbose=False)
changed = self.get_ansible_changes_count(result.stdout_str)
assert changed != 0, "No changes during k8s install!"
result = k8s_actions.install_k8s(custom_yaml=kube_settings,
verbose=False)
changed = self.get_ansible_changes_count(result.stdout_str)
assert changed == 0, (
"Should be no changes during the second install "
"of k8s while there are '{0}' changes!".format(changed))
def test_ccp_idempotency_with_etcd_in_container(self, config, underlay,
k8s_actions):
"""Test for deploying an k8s environment and check it
pytest.mark: k8s_installed_with_etcd_in_container
Scenario:
1. Install k8s with forced etcd in container.
2. Check that count of changes in the ansible log is more than 0
3. Re-install k8s with forced etcd in container.
4. Check that count of changes in the ansible log is 0
"""
kube_settings = dict()
kube_settings.update(self.kube_settings)
kube_settings.update({
'etcd_deployment_type': 'docker',
'kube_network_plugin': 'calico',
'etcd_image_repo': settings.ETCD_IMAGE_REPO,
'etcd_image_tag': settings.ETCD_IMAGE_TAG,
})
result = k8s_actions.install_k8s(custom_yaml=kube_settings,
verbose=False)
changed = self.get_ansible_changes_count(result.stdout_str)
assert changed != 0, "No changes during k8s install!"
result = k8s_actions.install_k8s(custom_yaml=kube_settings,
verbose=False)
changed = self.get_ansible_changes_count(result.stdout_str)
assert changed == 0, (
"Should be no changes during the second install "
"of k8s while there are '{0}' changes!".format(changed))
def test_ccp_idempotency_with_ready_ssh_keys(self, ssh_keys_dir,
config, underlay,
k8s_actions):
"""Test for deploying an k8s environment and check it
pytest.mark: k8s_installed_with_ready_ssh_keys
Scenario:
1. Install k8s (with prepared ssh keys).
2. Check that count of changes in the ansible log is more than 0
3. Re-install k8s (with prepared ssh keys).
4. Check that count of changes in the ansible log is 0
"""
add_var = {
"WORKSPACE": ssh_keys_dir
}
result = k8s_actions.install_k8s(env_var=add_var,
verbose=False)
changed = self.get_ansible_changes_count(result.stdout_str)
assert changed != 0, "No changes during k8s install!"
result = k8s_actions.install_k8s(env_var=add_var,
verbose=False)
changed = self.get_ansible_changes_count(result.stdout_str)
assert changed == 0, (
"Should be no changes during the second install "
"of k8s while there are '{0}' changes!".format(changed))
def case(*ids):
"""
Decorator to mark tests with testcase ids.
ie. @pytestrail.case('C123', 'C12345')
:return pytest.mark:
"""
return pytest.mark.testrail(ids=ids)
def testrail(*ids):
"""
Decorator to mark tests with testcase ids.
ie. @testrail('C123', 'C12345')
:return pytest.mark:
"""
deprecation_msg = ('pytest_testrail: the @testrail decorator is deprecated and will be removed. Please use the '
'@pytestrail.case decorator instead.')
warnings.warn(deprecation_msg, DeprecatedTestDecorator)
return pytestrail.case(*ids)
def get_marks(self, **params):
api_url = urljoin(self.url, '/v1/mark')
response = self.session.get(api_url, params=params)
return response.json()
def filter_fixtures(all_fixtures, fixtures_base_dir, mark_fn=None, ignore_fn=None):
"""
Helper function for filtering test fixtures.
- `fixtures_base_dir` should be the base directory that the fixtures were collected from.
- `mark_fn` should be a function which either returns `None` or a `pytest.mark` object.
- `ignore_fn` should be a function which returns `True` for any fixture
which should be ignored.
"""
for fixture_data in all_fixtures:
fixture_path = fixture_data[0]
fixture_relpath = os.path.relpath(fixture_path, fixtures_base_dir)
if ignore_fn:
if ignore_fn(fixture_relpath, *fixture_data[1:]):
continue
if mark_fn is not None:
mark = mark_fn(fixture_relpath, *fixture_data[1:])
if mark:
yield pytest.param(
(fixture_path, *fixture_data[1:]),
marks=mark,
)
continue
yield fixture_data
def pytest_namespace():
return {'mark': MarkGenerator()}
def pytest_addoption(parser):
group = parser.getgroup("general")
group._addoption(
'-k',
action="store", dest="keyword", default='', metavar="EXPRESSION",
help="only run tests which match the given substring expression. "
"An expression is a python evaluatable expression "
"where all names are substring-matched against test names "
"and their parent classes. Example: -k 'test_method or test_"
"other' matches all test functions and classes whose name "
"contains 'test_method' or 'test_other'. "
"Additionally keywords are matched to classes and functions "
"containing extra names in their 'extra_keyword_matches' set, "
"as well as functions which have names assigned directly to them."
)
group._addoption(
"-m",
action="store", dest="markexpr", default="", metavar="MARKEXPR",
help="only run tests matching given mark expression. "
"example: -m 'mark1 and not mark2'."
)
group.addoption(
"--markers", action="store_true",
help="show markers (builtin, plugin and per-project ones)."
)
parser.addini("markers", "markers for test functions", 'linelist')
def pytest_cmdline_main(config):
import _pytest.config
if config.option.markers:
config._do_configure()
tw = _pytest.config.create_terminal_writer(config)
for line in config.getini("markers"):
name, rest = line.split(":", 1)
tw.write("@pytest.mark.%s:" % name, bold=True)
tw.line(rest)
tw.line()
config._ensure_unconfigure()
return 0
def pytest_configure(config):
import pytest
if config.option.strict:
pytest.mark._config = config
def __call__(self, *args, **kwargs):
""" if passed a single callable argument: decorate it with mark info.
otherwise add *args/**kwargs in-place to mark information. """
if args and not kwargs:
func = args[0]
is_class = inspect.isclass(func)
if len(args) == 1 and (istestfunc(func) or is_class):
if is_class:
if hasattr(func, 'pytestmark'):
mark_list = func.pytestmark
if not isinstance(mark_list, list):
mark_list = [mark_list]
# always work on a copy to avoid updating pytestmark
# from a superclass by accident
mark_list = mark_list + [self]
func.pytestmark = mark_list
else:
func.pytestmark = [self]
else:
holder = getattr(func, self.name, None)
if holder is None:
holder = MarkInfo(
self.name, self.args, self.kwargs
)
setattr(func, self.name, holder)
else:
holder.add(self.args, self.kwargs)
return func
kw = self.kwargs.copy()
kw.update(kwargs)
args = self.args + args
return self.__class__(self.name, args=args, kwargs=kw)
def _backend(request):
backend = request.param
request.applymarker(getattr(pytest.mark, backend.__name__)())
inst = backend(**backend.settings_for('test', 'test', request.fixturename))
yield inst
inst.unset_all()
def pytest_namespace():
return {'mark': MarkGenerator()}
def pytest_addoption(parser):
group = parser.getgroup("general")
group._addoption(
'-k',
action="store", dest="keyword", default='', metavar="EXPRESSION",
help="only run tests which match the given substring expression. "
"An expression is a python evaluatable expression "
"where all names are substring-matched against test names "
"and their parent classes. Example: -k 'test_method or test_"
"other' matches all test functions and classes whose name "
"contains 'test_method' or 'test_other'. "
"Additionally keywords are matched to classes and functions "
"containing extra names in their 'extra_keyword_matches' set, "
"as well as functions which have names assigned directly to them."
)
group._addoption(
"-m",
action="store", dest="markexpr", default="", metavar="MARKEXPR",
help="only run tests matching given mark expression. "
"example: -m 'mark1 and not mark2'."
)
group.addoption(
"--markers", action="store_true",
help="show markers (builtin, plugin and per-project ones)."
)
parser.addini("markers", "markers for test functions", 'linelist')
def pytest_cmdline_main(config):
import _pytest.config
if config.option.markers:
config._do_configure()
tw = _pytest.config.create_terminal_writer(config)
for line in config.getini("markers"):
name, rest = line.split(":", 1)
tw.write("@pytest.mark.%s:" % name, bold=True)
tw.line(rest)
tw.line()
config._ensure_unconfigure()
return 0