def spawn(self, cmd, expect_timeout=10.0):
"""Run a command using pexpect.
The pexpect child is returned.
"""
pexpect = pytest.importorskip("pexpect", "3.0")
if hasattr(sys, 'pypy_version_info') and '64' in platform.machine():
pytest.skip("pypy-64 bit not supported")
if sys.platform == "darwin":
pytest.xfail("pexpect does not work reliably on darwin?!")
if sys.platform.startswith("freebsd"):
pytest.xfail("pexpect does not work reliably on freebsd")
logfile = self.tmpdir.join("spawn.out").open("wb")
child = pexpect.spawn(cmd, logfile=logfile)
self.request.addfinalizer(logfile.close)
child.timeout = expect_timeout
return child
python类xfail()的实例源码
def pytest_runtest_setup(item):
# Check if skip or skipif are specified as pytest marks
skipif_info = item.keywords.get('skipif')
if isinstance(skipif_info, (MarkInfo, MarkDecorator)):
eval_skipif = MarkEvaluator(item, 'skipif')
if eval_skipif.istrue():
item._evalskip = eval_skipif
pytest.skip(eval_skipif.getexplanation())
skip_info = item.keywords.get('skip')
if isinstance(skip_info, (MarkInfo, MarkDecorator)):
item._evalskip = True
if 'reason' in skip_info.kwargs:
pytest.skip(skip_info.kwargs['reason'])
elif skip_info.args:
pytest.skip(skip_info.args[0])
else:
pytest.skip("unconditional skip")
item._evalxfail = MarkEvaluator(item, 'xfail')
check_xfail_no_run(item)
def test_sample(dist):
for idx in range(len(dist.dist_params)):
# Compute CPU value.
with tensors_default_to("cpu"):
params = dist.get_dist_params(idx)
try:
cpu_value = dist.pyro_dist.sample(**params)
except ValueError as e:
pytest.xfail('CPU version fails: {}'.format(e))
assert not cpu_value.is_cuda
# Compute GPU value.
with tensors_default_to("cuda"):
params = dist.get_dist_params(idx)
cuda_value = dist.pyro_dist.sample(**params)
assert cuda_value.is_cuda
assert_equal(cpu_value.size(), cuda_value.size())
def test_get_config(self):
# These are set to False in YAML; defaults must not be used.
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
do_changelog_check = self.repo.get_config_value(
'changelog_check', True)
do_autoclose_pr = self.repo.get_config_value(
'autoclose_stale_pull_request', True)
hit_api_limit = False
if len(w) > 0:
hit_api_limit = True
if hit_api_limit:
pytest.xfail(str(w[-1].message))
else:
assert not (do_changelog_check or do_autoclose_pr)
def test_distributed_dot(hetr_device, config):
if hetr_device == 'gpu':
pytest.xfail("Intermittent failure on jenkins for mgpu")
device_id = config['device_id']
axes_x = config['axes_x']
axes_w = config['axes_w']
parallel_axis = config['parallel_axis']
np_weight = np.ones(axes_w.lengths)
with ng.metadata(device=hetr_device):
x = ng.placeholder(axes=axes_x)
with ng.metadata(device_id=device_id, parallel=parallel_axis):
w = ng.variable(axes=axes_w, initial_value=np_weight)
dot = ng.dot(x, w)
np_x = np.random.randint(100, size=axes_x.lengths)
with closing(ngt.make_transformer_factory('hetr',
device=hetr_device)()) as transformer:
computation = transformer.computation(dot, x)
res = computation(np_x)
np.testing.assert_array_equal(res, np.dot(np_x, np_weight))
def test_multi_computations(hetr_device):
if hetr_device == 'gpu':
pytest.xfail("enable after gpu exgraph")
axes_x = ng.make_axes([ax_A, ax_B])
x = ng.placeholder(axes=axes_x)
y = ng.placeholder(())
with ng.metadata(device_id=('0', '1'), parallel=ax_A):
f = x ** 2
out = y - ng.mean(f, out_axes=())
np_x = np.random.randint(10, size=axes_x.lengths)
np_y = np.random.randint(10)
with closing(ngt.make_transformer_factory('hetr', device=hetr_device)()) as t:
comp = t.computation(out, x, y)
another_comp = t.computation(f, x)
res_comp = comp(np_x, np_y)
res_another_comp = another_comp(np_x)
ref_comp = np_y - np.mean(np_x**2)
np.testing.assert_array_equal(res_comp, ref_comp)
np.testing.assert_array_equal(res_another_comp, np_x**2)
def test_repeat_computation(hetr_device, config):
if hetr_device == 'gpu':
pytest.xfail("enable after gpu exgraph")
device_id = config['device_id']
axes = config['axes']
parallel_axis = config['parallel_axis']
with ng.metadata(device=hetr_device):
x = ng.placeholder(axes=axes)
with ng.metadata(device_id=device_id, parallel=parallel_axis):
x_plus_one = x + 1
np_x = np.random.randint(100, size=axes.lengths)
with closing(ngt.make_transformer_factory('hetr', device=hetr_device)()) as transformer:
comp = transformer.computation(x_plus_one, x)
comp2 = transformer.computation(x_plus_one, x)
res = comp(np_x)
np.testing.assert_array_equal(res, np_x + 1)
res2 = comp2(np_x)
np.testing.assert_array_equal(res2, np_x + 1)
def test_distributed_dot_parallel_second_axis():
pytest.xfail("'parallel' for not first axis isn't supported yet")
H = ng.make_axis(length=4, name='height')
N = ng.make_axis(length=8, name='batch')
weight = ng.make_axis(length=2, name='weight')
x = ng.placeholder(axes=[H, N])
w = ng.placeholder(axes=[weight, H])
with ng.metadata(device_id=('0', '1'), parallel=N):
dot = ng.dot(w, x)
np_x = np.random.randint(100, size=[H.length, N.length])
np_weight = np.random.randint(100, size=[weight.length, H.length])
with ExecutorFactory() as ex:
computation = ex.executor(dot, x, w)
res = computation(np_x, np_weight)
np.testing.assert_array_equal(res, np.dot(np_weight, np_x))
def test_multiple_gather_ops(hetr_device):
if hetr_device == 'gpu':
if 'gpu' not in ngt.transformer_choices():
pytest.skip("GPUTransformer not available")
pytest.xfail("Failure due to gather recv tensor being returned in wrong shape, "
" possible mismatch between op layout and op.tensor layout")
H = ng.make_axis(length=2, name='height')
W = ng.make_axis(length=4, name='width')
x = ng.placeholder(axes=[H, W])
with ng.metadata(device_id=('0', '1'), parallel=W):
x_plus_one = x + 1
x_mul_two = x_plus_one * 2
input = np.random.randint(100, size=x.axes.lengths)
with closing(ngt.make_transformer_factory('hetr', device=hetr_device)()) as hetr:
plus = hetr.computation([x_mul_two, x_plus_one], x)
result_mul_two, result_plus_one = plus(input)
np.testing.assert_array_equal(result_plus_one, input + 1)
np.testing.assert_array_equal(result_mul_two, (input + 1) * 2)
def test_linear_ones(input_size, input_placeholder, output_size):
# basic sanity check with all ones on the inputs and weights, check that
# each row in output is the sum of the weights for that output this check
# will confirm that the correct number of operations is being run
x = np.ones(input_placeholder.axes.lengths)
layer = Linear(nout=output_size, init=UniformInit(1.0, 1.0))
with ExecutorFactory() as ex:
if ex.transformer.transformer_name == 'hetr':
pytest.xfail("hetr fork-safe issue on mac")
out = layer(input_placeholder)
comp = ex.executor([out, layer.W], input_placeholder)
output_values, w = comp(x)
ng.testing.assert_allclose(
np.ones(out.axes.lengths) * input_size,
output_values,
atol=0.0, rtol=0.0
)
def test_linear_keep_axes_ones(batch_axis, input_size, input_placeholder, output_size,
transformer_factory):
# basic sanity check with all ones on the inputs and weights, check that
# each row in output is the sum of the weights for that output this check
# will confirm that the correct number of operations is being run
x = np.ones(input_placeholder.axes.lengths)
layer = Linear(nout=output_size, keep_axes=[], init=UniformInit(1.0, 1.0))
with ExecutorFactory() as ex:
if ex.transformer.transformer_name == 'hetr':
pytest.xfail("hetr fork-safe issue on mac")
out = layer(input_placeholder)
comp = ex.executor([out, layer.W], input_placeholder)
output_values, w = comp(x)
assert np.allclose(
np.ones(out.axes.lengths) * input_size * batch_axis.length,
output_values,
atol=0.0, rtol=0.0
)
def test_linear_keep_batch_axes_ones(batch_axis, input_size, input_placeholder, output_size,
transformer_factory):
# basic sanity check with all ones on the inputs and weights, check that
# each row in output is the sum of the weights for that output this check
# will confirm that the correct number of operations is being run
x = np.ones(input_placeholder.axes.lengths)
layer = Linear(nout=output_size, keep_axes=[batch_axis], init=UniformInit(1.0, 1.0))
with ExecutorFactory() as ex:
if ex.transformer.transformer_name == 'hetr':
pytest.xfail("hetr fork-safe issue on mac")
out = layer(input_placeholder)
comp = ex.executor([out, layer.W], input_placeholder)
output_values, w = comp(x)
assert np.allclose(
np.ones(out.axes.lengths) * input_size,
output_values,
atol=0.0, rtol=0.0
)
def test_safe_terminate(execmodel):
if execmodel.backend != "threading":
pytest.xfail("execution model %r does not support task count" %
execmodel.backend)
import threading
active = threading.active_count()
l = []
def term():
py.std.time.sleep(3)
def kill():
l.append(1)
safe_terminate(execmodel, 1, [(term, kill)] * 10)
assert len(l) == 10
sleep(0.1)
py.std.gc.collect()
assert execmodel.active_count() == active
def test_safe_terminate2(execmodel):
if execmodel.backend != "threading":
pytest.xfail("execution model %r does not support task count" %
execmodel.backend)
import threading
active = threading.active_count()
l = []
def term():
return
def kill():
l.append(1)
safe_terminate(execmodel, 3, [(term, kill)] * 10)
assert len(l) == 0
sleep(0.1)
py.std.gc.collect()
assert threading.active_count() == active
def test_endmarker_delivery_on_remote_killterm(makegateway, execmodel):
if execmodel.backend != "thread":
pytest.xfail("test and execnet not compatible to greenlets yet")
gw = makegateway('popen')
q = execmodel.queue.Queue()
channel = gw.remote_exec(source='''
import os, time
channel.send(os.getpid())
time.sleep(100)
''')
pid = channel.receive()
py.process.kill(pid)
channel.setcallback(q.put, endmarker=999)
val = q.get(TESTTIMEOUT)
assert val == 999
err = channel._getremoteerror()
assert isinstance(err, EOFError)
def anypython(request):
name = request.param
executable = getexecutable(name)
if executable is None:
if sys.platform == "win32":
executable = winpymap.get(name, None)
if executable:
executable = py.path.local(executable)
if executable.check():
return executable
executable = None
py.test.skip("no %s found" % (name,))
if "execmodel" in request.fixturenames and name != 'sys.executable':
backend = request.getfuncargvalue("execmodel").backend
if backend != "thread":
pytest.xfail(
"cannot run %r execmodel with bare %s" % (backend, name))
return executable
def test_timeout_time(loop):
foo_running = None
start = loop.time()
with pytest.raises(asyncio.TimeoutError):
with timeout(0.1, loop=loop):
foo_running = True
try:
yield from asyncio.sleep(0.2, loop=loop)
finally:
foo_running = False
dt = loop.time() - start
if not (0.09 < dt < 0.11) and os.environ.get('APPVEYOR'):
pytest.xfail('appveyor sometimes is toooo sloooow')
assert 0.09 < dt < 0.11
assert not foo_running
def spawn(self, cmd, expect_timeout=10.0):
"""Run a command using pexpect.
The pexpect child is returned.
"""
pexpect = pytest.importorskip("pexpect", "3.0")
if hasattr(sys, 'pypy_version_info') and '64' in platform.machine():
pytest.skip("pypy-64 bit not supported")
if sys.platform == "darwin":
pytest.xfail("pexpect does not work reliably on darwin?!")
if sys.platform.startswith("freebsd"):
pytest.xfail("pexpect does not work reliably on freebsd")
logfile = self.tmpdir.join("spawn.out").open("wb")
child = pexpect.spawn(cmd, logfile=logfile)
self.request.addfinalizer(logfile.close)
child.timeout = expect_timeout
return child
def pytest_runtest_setup(item):
# Check if skip or skipif are specified as pytest marks
skipif_info = item.keywords.get('skipif')
if isinstance(skipif_info, (MarkInfo, MarkDecorator)):
eval_skipif = MarkEvaluator(item, 'skipif')
if eval_skipif.istrue():
item._evalskip = eval_skipif
pytest.skip(eval_skipif.getexplanation())
skip_info = item.keywords.get('skip')
if isinstance(skip_info, (MarkInfo, MarkDecorator)):
item._evalskip = True
if 'reason' in skip_info.kwargs:
pytest.skip(skip_info.kwargs['reason'])
elif skip_info.args:
pytest.skip(skip_info.args[0])
else:
pytest.skip("unconditional skip")
item._evalxfail = MarkEvaluator(item, 'xfail')
check_xfail_no_run(item)
def pytest_runtest_setup(item):
# Check if skip or skipif are specified as pytest marks
skipif_info = item.keywords.get('skipif')
if isinstance(skipif_info, (MarkInfo, MarkDecorator)):
eval_skipif = MarkEvaluator(item, 'skipif')
if eval_skipif.istrue():
item._evalskip = eval_skipif
pytest.skip(eval_skipif.getexplanation())
skip_info = item.keywords.get('skip')
if isinstance(skip_info, (MarkInfo, MarkDecorator)):
item._evalskip = True
if 'reason' in skip_info.kwargs:
pytest.skip(skip_info.kwargs['reason'])
elif skip_info.args:
pytest.skip(skip_info.args[0])
else:
pytest.skip("unconditional skip")
item._evalxfail = MarkEvaluator(item, 'xfail')
check_xfail_no_run(item)
def check_influxdb_xfail(sl_deployed, node_name, value):
def check_influxdb_data():
return value in sl_deployed.check_data_in_influxdb(node_name)
try:
helpers.wait(
check_influxdb_data,
timeout=10, interval=2,
timeout_msg=('Influxdb data {0} was not replicated to {1} '
'[https://mirantis.jira.com/browse/PROD-16272]'
.format(value, node_name)))
except Exception:
pytest.xfail('Influxdb data {0} was not replicated to {1} '
'[https://mirantis.jira.com/browse/PROD-16272]'
.format(value, node_name))
def test_touch_privileged_metadata_item(self, funcname,
name_metadata_item):
""" AttributeDict has a few metadata members that may not be set. """
# Create AttributeDict and ensure it has the target item.
ad = AttributeDict(dict(basic_entries()))
assert hasattr(ad, name_metadata_item)
# If current test parameter is a setter, it needs a value argument.
dummy_value = "this_will_fail"
touch = getattr(ad, funcname)
args = (name_metadata_item, )
# Make the actual call under test.
if funcname in ["__setattr__", "__setitem__"]:
pytest.xfail(
"Since {} is recursive, it's difficult to prohibit "
"post-construction attempts to set metadata. It may "
"not even be desirable".format(AttributeDict.__name__))
args += (dummy_value, )
with pytest.raises(_MetadataOperationException):
touch.__call__(*args)
def test_build_deps_on_distutils(request, tmpdir_factory, build_dep):
"""
All setuptools build dependencies must build without
setuptools.
"""
if 'pyparsing' in build_dep:
pytest.xfail(reason="Project imports setuptools unconditionally")
build_target = tmpdir_factory.mktemp('source')
build_dir = download_and_extract(request, build_dep, build_target)
install_target = tmpdir_factory.mktemp('target')
output = install(build_dir, install_target)
for line in output.splitlines():
match = re.search('Unknown distribution option: (.*)', line)
allowed_unknowns = [
'test_suite',
'tests_require',
'install_requires',
]
assert not match or match.group(1).strip('"\'') in allowed_unknowns
def test_summing_simple(self, testdir):
testdir.makepyfile("""
import pytest
def test_pass():
pass
def test_fail():
assert 0
def test_skip():
pytest.skip("")
@pytest.mark.xfail
def test_xfail():
assert 0
@pytest.mark.xfail
def test_xpass():
assert 1
""")
result, dom = runandparse(testdir)
assert result.ret
node = dom.getElementsByTagName("testsuite")[0]
assert_attr(node, name="pytest", errors=0, failures=1, skips=3, tests=2)
def test_xfailure_function(self, testdir):
testdir.makepyfile("""
import pytest
def test_xfail():
pytest.xfail("42")
""")
result, dom = runandparse(testdir)
assert not result.ret
node = dom.getElementsByTagName("testsuite")[0]
assert_attr(node, skips=1, tests=0)
tnode = node.getElementsByTagName("testcase")[0]
assert_attr(tnode,
file="test_xfailure_function.py",
line="1",
classname="test_xfailure_function",
name="test_xfail")
fnode = tnode.getElementsByTagName("skipped")[0]
assert_attr(fnode, message="expected test failure")
#assert "ValueError" in fnode.toxml()
def test_xfailure_xpass(self, testdir):
testdir.makepyfile("""
import pytest
@pytest.mark.xfail
def test_xpass():
pass
""")
result, dom = runandparse(testdir)
#assert result.ret
node = dom.getElementsByTagName("testsuite")[0]
assert_attr(node, skips=1, tests=0)
tnode = node.getElementsByTagName("testcase")[0]
assert_attr(tnode,
file="test_xfailure_xpass.py",
line="1",
classname="test_xfailure_xpass",
name="test_xpass")
fnode = tnode.getElementsByTagName("skipped")[0]
assert_attr(fnode, message="xfail-marked test passes unexpectedly")
#assert "ValueError" in fnode.toxml()
def test_xfail_not_run_xfail_reporting(self, testdir):
p = testdir.makepyfile(test_one="""
import pytest
@pytest.mark.xfail(run=False, reason="noway")
def test_this():
assert 0
@pytest.mark.xfail("True", run=False)
def test_this_true():
assert 0
@pytest.mark.xfail("False", run=False, reason="huh")
def test_this_false():
assert 1
""")
result = testdir.runpytest(p, '--report=xfailed', )
result.stdout.fnmatch_lines([
"*test_one*test_this*",
"*NOTRUN*noway",
"*test_one*test_this_true*",
"*NOTRUN*condition:*True*",
"*1 passed*",
])
def test_xfail_imperative(self, testdir):
p = testdir.makepyfile("""
import pytest
def test_this():
pytest.xfail("hello")
""")
result = testdir.runpytest(p)
result.stdout.fnmatch_lines([
"*1 xfailed*",
])
result = testdir.runpytest(p, "-rx")
result.stdout.fnmatch_lines([
"*XFAIL*test_this*",
"*reason:*hello*",
])
result = testdir.runpytest(p, "--runxfail")
result.stdout.fnmatch_lines("*1 pass*")
def test_xfail_imperative_in_setup_function(self, testdir):
p = testdir.makepyfile("""
import pytest
def setup_function(function):
pytest.xfail("hello")
def test_this():
assert 0
""")
result = testdir.runpytest(p)
result.stdout.fnmatch_lines([
"*1 xfailed*",
])
result = testdir.runpytest(p, "-rx")
result.stdout.fnmatch_lines([
"*XFAIL*test_this*",
"*reason:*hello*",
])
result = testdir.runpytest(p, "--runxfail")
result.stdout.fnmatch_lines("""
*def test_this*
*1 fail*
""")
def test_reportchars(testdir):
testdir.makepyfile("""
import pytest
def test_1():
assert 0
@pytest.mark.xfail
def test_2():
assert 0
@pytest.mark.xfail
def test_3():
pass
def test_4():
pytest.skip("four")
""")
result = testdir.runpytest("-rfxXs")
result.stdout.fnmatch_lines([
"FAIL*test_1*",
"XFAIL*test_2*",
"XPASS*test_3*",
"SKIP*four*",
])