def collect(self):
import doctest
if self.fspath.basename == "conftest.py":
module = self.config.pluginmanager._importconftest(self.fspath)
else:
try:
# XXX patch pyimport in pytest._pytest.doctest.DoctestModule
module = _patch_pyimport(self.fspath)
except ImportError:
if self.config.getoption('--cython-ignore-import-errors'):
pytest.skip('unable to import module %r' % self.fspath)
else:
raise
# uses internal doctest module parsing mechanism
finder = doctest.DocTestFinder()
optionflags = get_optionflags(self)
checker = None if _get_checker is None else _get_checker()
runner = doctest.DebugRunner(verbose=0, optionflags=optionflags,
checker=checker)
for test in finder.find(module, module.__name__):
if test.examples: # skip empty doctests
yield DoctestItem(test.name, self, runner, test)
python类skip()的实例源码
def test_no_underscores_all_dashes(requirements_files=REQUIREMENTS_FILES):
if all(
not os.path.exists(reqfile)
for reqfile in requirements_files
): # pragma: no cover
pytest.skip('No requirements files found')
for requirement_file in requirements_files:
if not os.path.exists(requirement_file):
continue
for line in get_lines_from_file(requirement_file):
if '_' in line:
raise AssertionError(
'Use dashes for package names {}: {}'.format(
requirement_file, line,
)
)
def install_server(db):
db.socket = str(db.installation_dir.join('mysql.sock'))
db.installation_dir.ensure_dir('tmp')
mycnf = db.installation_dir.join('my.cnf')
mycnf.write('[mysqld]\n'
'datadir=%(installation_dir)s\n'
#'log\n'
#'log-error\n'
'skip-networking\n'
'socket=mysql.sock\n'
'pid-file=mysqld.pid\n'
'tmpdir=tmp\n' % { 'installation_dir': db.installation_dir })
subprocess.check_call(['mysql_install_db', '--defaults-file=' + str(mycnf)])
server = subprocess.Popen(['mysqld', '--defaults-file=' + str(mycnf)])
import time, MySQLdb
tries = 30
for t in range(tries):
try:
with db.root() as cur:
cur.execute("CREATE USER 'abe'@'localhost' IDENTIFIED BY 'Bitcoin'")
return server
except MySQLdb.OperationalError as e:
if t+1 == tries:
raise e
time.sleep(1)
def setUp(self):
if self._should_be_skipped_due_to_version():
pytest.skip('Test cannot run with Python %s.' % (sys.version.split(' ')[0],))
missing = []
for req in self._test_file.options['requires']:
try:
__import__(req)
except ImportError:
missing.append(req)
if missing:
pytest.skip('Requires %s to be present.' % (','.join(missing),))
if self._test_file.options['except_implementations']:
implementations = [
item.strip() for item in
self._test_file.options['except_implementations'].split(",")
]
implementation = platform.python_implementation()
if implementation in implementations:
pytest.skip(
'Test cannot run with Python implementation %r'
% (implementation, ))
def test_cXY_E0(nr_sites, gamma, rgen, ldim=2):
if sys.version_info[:2] == (3, 3) and gamma == -0.5:
# Skip this test on Python 3.3 because it fails on Travis (but
# only for Python 3.3). eigsh() fails with:
# scipy.sparse.linalg.eigen.arpack.arpack.ArpackNoConvergence:
# ARPACK error -1: No convergence (xxx iterations, 0/1
# eigenvectors converged) [ARPACK error -14: ZNAUPD did not
# find any eigenvalues to sufficient accuracy.]
pt.skip("Test fails on Travis for unknown reason")
return
# Verify that the analytical solution of the ground state energy
# matches the numerical value from eigsh()
E0 = physics.cXY_E0(nr_sites, gamma)
H = physics.sparse_cH(physics.cXY_local_terms(nr_sites, gamma))
# Fix start vector for eigsh()
v0 = rgen.randn(ldim**nr_sites) + 1j * rgen.randn(ldim**nr_sites)
ev = eigsh(H, k=1, which='SR', v0=v0, return_eigenvectors=False).min()
assert abs(E0 - ev) <= 1e-13
def spawn(self, cmd, expect_timeout=10.0):
"""Run a command using pexpect.
The pexpect child is returned.
"""
pexpect = pytest.importorskip("pexpect", "3.0")
if hasattr(sys, 'pypy_version_info') and '64' in platform.machine():
pytest.skip("pypy-64 bit not supported")
if sys.platform == "darwin":
pytest.xfail("pexpect does not work reliably on darwin?!")
if sys.platform.startswith("freebsd"):
pytest.xfail("pexpect does not work reliably on freebsd")
logfile = self.tmpdir.join("spawn.out").open("wb")
child = pexpect.spawn(cmd, logfile=logfile)
self.request.addfinalizer(logfile.close)
child.timeout = expect_timeout
return child
def pytest_runtest_setup(item):
# Check if skip or skipif are specified as pytest marks
skipif_info = item.keywords.get('skipif')
if isinstance(skipif_info, (MarkInfo, MarkDecorator)):
eval_skipif = MarkEvaluator(item, 'skipif')
if eval_skipif.istrue():
item._evalskip = eval_skipif
pytest.skip(eval_skipif.getexplanation())
skip_info = item.keywords.get('skip')
if isinstance(skip_info, (MarkInfo, MarkDecorator)):
item._evalskip = True
if 'reason' in skip_info.kwargs:
pytest.skip(skip_info.kwargs['reason'])
elif skip_info.args:
pytest.skip(skip_info.args[0])
else:
pytest.skip("unconditional skip")
item._evalxfail = MarkEvaluator(item, 'xfail')
check_xfail_no_run(item)
def getoption(self, name, default=notset, skip=False):
""" return command line option value.
:arg name: name of the option. You may also specify
the literal ``--OPT`` option instead of the "dest" option name.
:arg default: default value if no option of that name exists.
:arg skip: if True raise pytest.skip if option does not exists
or has a None value.
"""
name = self._opt2dest.get(name, name)
try:
val = getattr(self.option, name)
if val is None and skip:
raise AttributeError(name)
return val
except AttributeError:
if default is not notset:
return default
if skip:
import pytest
pytest.skip("no %r option found" %(name,))
raise ValueError("no option named %r" % (name,))
def collect(self):
import doctest
if self.fspath.basename == "conftest.py":
module = self.config.pluginmanager._importconftest(self.fspath)
else:
try:
module = self.fspath.pyimport()
except ImportError:
if self.config.getvalue('doctest_ignore_import_errors'):
pytest.skip('unable to import module %r' % self.fspath)
else:
raise
# uses internal doctest module parsing mechanism
finder = doctest.DocTestFinder()
optionflags = get_optionflags(self)
runner = doctest.DebugRunner(verbose=0, optionflags=optionflags,
checker=_get_checker())
for test in finder.find(module, module.__name__):
if test.examples: # skip empty doctests
yield DoctestItem(test.name, self, runner, test)
def test_setup_teardown_sequence_numbers(self):
if list(self.kwargs.keys()) == ['binary_customisation_script']:
pytest.skip('Test only applies to chroot hooks.')
generate_build_config._write_cloud_config(
open(self.output_file.strpath, 'w'), **self.kwargs)
cloud_config = yaml.load(self.output_file.open())
sequence_numbers = {}
for stanza in cloud_config['write_files']:
sequence_number = stanza['path'].rsplit('/')[-1].split('-')[0]
content = base64.b64decode(stanza['content']).decode('utf-8')
if '-- chroot --' in content:
sequence_numbers['chroot'] = sequence_number
elif '-- setup --' in content:
sequence_numbers['setup'] = sequence_number
elif '-- teardown --' in content:
sequence_numbers['teardown'] = sequence_number
assert sequence_numbers['setup'] < sequence_numbers['chroot']
assert sequence_numbers['chroot'] < sequence_numbers['teardown']
def test_setup_teardown_content_matches_template(self, hook, monkeypatch):
if list(self.kwargs.keys()) == ['binary_customisation_script']:
pytest.skip('Test only applies to chroot hooks.')
expected_string = '#!/bin/sh\n-- specific test content --'
monkeypatch.setattr(
generate_build_config,
"{}_CONTENT".format(hook.upper()), expected_string)
generate_build_config._write_cloud_config(
open(self.output_file.strpath, 'w'), **self.kwargs)
cloud_config = yaml.load(self.output_file.open())
contents = [base64.b64decode(stanza['content'])
for stanza in cloud_config['write_files']]
expected_bytes = expected_string.encode('utf-8')
assert expected_bytes in contents
assert 1 == len(
[content for content in contents if expected_bytes == content])
def _get_style_test_options(filename):
""" Returns (skip, ignores) for the specifies source file.
"""
skip = False
ignores = []
text = open(filename, 'rb').read().decode('utf-8')
# Iterate over lines
for i, line in enumerate(text.splitlines()):
if i > 20:
break
if line.startswith('# styletest:'):
if 'skip' in line:
skip = True
elif 'ignore' in line:
words = line.replace(',', ' ').split(' ')
words = [w.strip() for w in words if w.strip()]
words = [w for w in words if
(w[1:].isnumeric() and w[0] in 'EWFCN')]
ignores.extend(words)
return skip, ignores
def test_batch_log_pdf_mask(dist):
if dist.get_test_distribution_name() not in ('Normal', 'Bernoulli', 'Categorical'):
pytest.skip('Batch pdf masking not supported for the distribution.')
d = dist.pyro_dist
for idx in range(dist.get_num_test_data()):
dist_params = dist.get_dist_params(idx)
x = dist.get_test_data(idx)
with xfail_if_not_implemented():
batch_pdf_shape = d.batch_shape(**dist_params) + (1,)
batch_pdf_shape_broadcasted = d.batch_shape(x, **dist_params) + (1,)
zeros_mask = ng_zeros(1) # should be broadcasted to data dims
ones_mask = ng_ones(batch_pdf_shape) # should be broadcasted to data dims
half_mask = ng_ones(1) * 0.5
batch_log_pdf = d.batch_log_pdf(x, **dist_params)
batch_log_pdf_zeros_mask = d.batch_log_pdf(x, log_pdf_mask=zeros_mask, **dist_params)
batch_log_pdf_ones_mask = d.batch_log_pdf(x, log_pdf_mask=ones_mask, **dist_params)
batch_log_pdf_half_mask = d.batch_log_pdf(x, log_pdf_mask=half_mask, **dist_params)
assert_equal(batch_log_pdf_ones_mask, batch_log_pdf)
assert_equal(batch_log_pdf_zeros_mask, ng_zeros(batch_pdf_shape_broadcasted))
assert_equal(batch_log_pdf_half_mask, 0.5 * batch_log_pdf)
def test_uint64(self):
if sys.maxsize != (1 << 63)-1:
pytest.skip('64 bit only')
if IS_PYPY and sys.pypy_version_info < (5, 6):
pytest.skip('Broken on PyPy<5.6')
#
buf = struct.pack('QQ', sys.maxsize, sys.maxsize+1)
s = BaseSegment(buf)
#
val = s.read_uint64_magic(0)
assert val == sys.maxsize == s.read_uint64(0)
assert type(val) is int
#
val = s.read_primitive(0, ord('Q'))
assert val == sys.maxsize == s.read_uint64(0)
assert type(val) is int
#
val = s.read_uint64_magic(8)
assert val == sys.maxsize+1 == s.read_uint64(8)
if PY3:
assert type(val) is int
else:
assert type(val) is long
def test_copy_pointer(self, schema, benchmark):
# this is similar to test_struct, but the struct we set has a very
# deep structure, which means that we are effectively measuring the
# performance of copy_pointer
if schema.__name__ not in ('Capnpy', 'PyCapnp'):
pytest.skip('N/A')
#
#self._make_big_tree() # uncomment this if you want to regenerate the file
s = self.BIG_TREE.read("rb")
tree = schema.Tree.loads(s)
def loop(oldtree):
for i in range(1000):
new_tree = schema.Tree(oldtree.root)
return new_tree
new_tree = benchmark(loop, tree)
assert new_tree.root.x == 9999
def test_copy_buffer(self, schema, benchmark):
# this is not really a dumps, but it is used as a baseline to compare
# the performance
if schema.__name__ != 'Capnpy':
pytest.skip('N/A')
#
def dumps_N(obj):
myobjs = (obj, obj)
res = 0
for i in range(self.N):
obj = myobjs[i%2]
res = obj._seg.buf[:]
return res
#
obj = get_obj(schema)
res = benchmark(dumps_N, obj)
assert type(res) is six.binary_type
def test_dumps_not_compact(self, schema, benchmark):
if schema.__name__ != 'Capnpy':
pytest.skip('N/A')
#
def dumps_N(obj):
myobjs = (obj, obj)
res = 0
for i in range(self.N):
obj = myobjs[i%2]
res = obj.dumps()
return res
#
obj = get_obj(schema)
container = schema.MyStructContainer(items=[obj, obj])
obj0 = container.items[0]
assert not obj0._is_compact()
res = benchmark(dumps_N, obj0)
assert type(res) is six.binary_type
def test_dumps_not_compact_no_fastpath(self, schema, benchmark):
if schema.__name__ != 'Capnpy':
pytest.skip('N/A')
#
def dumps_N(obj):
myobjs = (obj, obj)
res = 0
for i in range(self.N):
obj = myobjs[i%2]
res = obj.dumps(fastpath=False)
return res
#
obj = get_obj(schema)
container = schema.MyStructContainer(items=[obj, obj])
obj0 = container.items[0]
assert not obj0._is_compact()
res = benchmark(dumps_N, obj0)
assert type(res) is six.binary_type
def test_count(self, test_db):
assert 0 == await test_db.test.find().count()
await test_db.test.insert_many([{'x': i} for i in range(10)])
assert 10 == await test_db.test.find().count()
assert isinstance(await test_db.test.find().count(), int)
assert 10 == await test_db.test.find().limit(5).count()
assert 10 == await test_db.test.find().skip(5).count()
assert 1 == await test_db.test.find({'x': 1}).count()
assert 5 == await test_db.test.find({'x': {'$lt': 5}}).count()
a = test_db.test.find()
b = await a.count()
async for _ in a:
break
assert b == await a.count()
assert 0 == await test_db.test.acollectionthatdoesntexist.find().count()
def test_count_with_limit_and_skip(self, test_db):
with pytest.raises(TypeError):
await test_db.test.find().count('foo')
async def check_len(cursor, length):
assert len(await cursor.to_list()) == await cursor.count(True)
assert length == await cursor.count(True)
await test_db.test.insert_many([{'i': i} for i in range(100)])
await check_len(test_db.test.find(), 100)
await check_len(test_db.test.find().limit(10), 10)
await check_len(test_db.test.find().limit(110), 100)
await check_len(test_db.test.find().skip(10), 90)
await check_len(test_db.test.find().skip(110), 0)
await check_len(test_db.test.find().limit(10).skip(10), 10)
await check_len(test_db.test.find().limit(10).skip(95), 5)
def test_profiling_info(self, mongo, test_db):
connection = await mongo.get_connection()
if connection.is_mongos:
pytest.skip('Profiling works only without mongos.')
return
await test_db.system.profile.drop()
await test_db.set_profiling_level(ALL)
await test_db.test.find_one()
await test_db.set_profiling_level(OFF)
info = await test_db.profiling_info()
assert isinstance(info, list)
assert len(info) >= 1
# These basically clue us in to server changes.
assert isinstance(info[0]['responseLength'], int)
assert isinstance(info[0]['millis'], int)
assert isinstance(info[0]['client'], str)
assert isinstance(info[0]['user'], str)
assert isinstance(info[0]['ns'], str)
assert isinstance(info[0]['op'], str)
assert isinstance(info[0]['ts'], datetime.datetime)
def skip_by_version(request, openshift_version):
if request.node.cls.tasks.get('version_limits') and openshift_version:
lowest_version = request.node.cls.tasks['version_limits'].get('min')
highest_version = request.node.cls.tasks['version_limits'].get('max')
skip_latest = request.node.cls.tasks['version_limits'].get('skip_latest')
too_low = lowest_version and parse_version(lowest_version) > parse_version(openshift_version)
too_high = highest_version and parse_version(highest_version) < parse_version(openshift_version)
if openshift_version == 'latest':
if skip_latest:
pytest.skip('This API is not supported in the latest openshift version')
elif too_low:
pytest.skip('This API is not supported in openshift versions > {}. You are using version {}'.format(lowest_version, openshift_version))
elif too_high:
pytest.skip('This API is not supported in openshift versions < {}. You are using version {}'.format(highest_version, openshift_version))
def test_comm_broadcast_op(hetr_device):
if hetr_device == 'gpu':
pytest.skip('gpu communication broadcast op is not supported.')
H = ng.make_axis(length=4, name='height')
N = ng.make_axis(length=8, name='batch')
weight = ng.make_axis(length=2, name='weight')
x = ng.placeholder(axes=[N, H])
# w will be broadcasted to devices
w = ng.placeholder(axes=[H, weight])
with ng.metadata(device_id=('0', '1'), parallel=N):
dot = ng.dot(x, w)
np_x = np.random.randint(100, size=[N.length, H.length])
np_weight = np.random.randint(100, size=[H.length, weight.length])
with ExecutorFactory() as ex:
computation = ex.executor(dot, x, w)
res = computation(np_x, np_weight)
np.testing.assert_array_equal(res, np.dot(np_x, np_weight))
def test_hetr_benchmark(hetr_device, config):
pytest.skip('Possible issue only on jenkins, disable until figured out.')
"""
Description:
Test to ensure benchmarks are working.
Benchmark used for test is mini_resnet
"""
from examples.benchmarks.mini_resnet import run_resnet_benchmark
c = config
run_resnet_benchmark(dataset=c['dataset'],
num_iterations=c['iter_count'],
n_skip=1,
batch_size=c['batch_size'],
device_id=c['device_id'],
transformer_type='hetr',
device=hetr_device,
bprop=c['bprop'],
batch_norm=c['batch_norm'],
visualize=False)
def test_allreduce_hint(hetr_device, config):
if hetr_device == 'gpu':
if 'gpu' not in ngt.transformer_choices():
pytest.skip("GPUTransformer not available")
input = config['input']
device_id = config['device_id']
axis_A = ng.make_axis(length=4, name='axis_A')
parallel_axis = ng.make_axis(name='axis_parallel', length=16)
with ng.metadata(device=hetr_device,
device_id=device_id,
parallel=parallel_axis):
var_A = ng.variable(axes=[axis_A], initial_value=UniformInit(1, 1))
var_B = ng.variable(axes=[axis_A], initial_value=UniformInit(input, input))
var_B.metadata['reduce_func'] = 'sum'
var_B_mean = var_B / len(device_id)
var_minus = (var_A - var_B_mean)
with closing(ngt.make_transformer_factory('hetr', device=hetr_device)()) as hetr:
out_comp = hetr.computation(var_minus)
result = out_comp()
np_result = np.full((axis_A.length), config['expected_result'], np.float32)
np.testing.assert_array_equal(result, np_result)
def test_multiple_gather_ops(hetr_device):
if hetr_device == 'gpu':
if 'gpu' not in ngt.transformer_choices():
pytest.skip("GPUTransformer not available")
pytest.xfail("Failure due to gather recv tensor being returned in wrong shape, "
" possible mismatch between op layout and op.tensor layout")
H = ng.make_axis(length=2, name='height')
W = ng.make_axis(length=4, name='width')
x = ng.placeholder(axes=[H, W])
with ng.metadata(device_id=('0', '1'), parallel=W):
x_plus_one = x + 1
x_mul_two = x_plus_one * 2
input = np.random.randint(100, size=x.axes.lengths)
with closing(ngt.make_transformer_factory('hetr', device=hetr_device)()) as hetr:
plus = hetr.computation([x_mul_two, x_plus_one], x)
result_mul_two, result_plus_one = plus(input)
np.testing.assert_array_equal(result_plus_one, input + 1)
np.testing.assert_array_equal(result_mul_two, (input + 1) * 2)
def pytest_configure(config):
# when marking argon_disabled for a whole test, but flex_disabled only on one
# parametrized version of that test, the argon marking disappeared
config.flex_and_argon_disabled = pytest.mark.xfail(config.getvalue("transformer") == "flexgpu" or
config.getvalue("transformer") == "argon",
reason="Not supported by argon or flex backend",
strict=True)
config.argon_disabled = pytest.mark.xfail(config.getvalue("transformer") == "argon",
reason="Not supported by argon backend",
strict=True)
config.flex_disabled = pytest.mark.xfail(config.getvalue("transformer") == "flexgpu",
reason="Failing test for Flex",
strict=True)
config.hetr_and_cpu_enabled_only = pytest.mark.xfail(config.getvalue("transformer") != "hetr" and
config.getvalue("transformer") != "cpu",
reason="Only Hetr/CPU and CPU transformers supported",
strict=True)
config.flex_skip = pytest.mark.skipif(config.getvalue("transformer") == "flexgpu",
reason="Randomly failing test for Flex")
config.argon_skip = pytest.mark.skipif(config.getvalue("transformer") == "argon")
config.flex_skip_now = pytest.skip if config.getvalue("transformer") == "flexgpu" \
else pass_method
config.argon_skip_now = pytest.skip if config.getvalue("transformer") == "argon" \
else pass_method
def test_matmul(self):
if not TEST_MATMUL:
pytest.skip("matmul is only tested in Python 3.5+")
D = {'shape': self.A.shape,
'matvec': lambda x: np.dot(self.A, x).reshape(self.A.shape[0]),
'rmatvec': lambda x: np.dot(self.A.T.conj(),
x).reshape(self.A.shape[1]),
'matmat': lambda x: np.dot(self.A, x)}
A = interface.LinearOperator(**D)
B = np.array([[1, 2, 3],
[4, 5, 6],
[7, 8, 9]])
b = B[0]
assert_equal(operator.matmul(A, b), A * b)
assert_equal(operator.matmul(A, B), A * B)
assert_raises(ValueError, operator.matmul, A, 2)
assert_raises(ValueError, operator.matmul, 2, A)
def test_geth_installation_as_function_call(monkeypatch, tmpdir, platform, version):
if get_platform() != platform:
pytest.skip("Wront platform for install script")
base_install_path = str(tmpdir.mkdir("temporary-dir"))
monkeypatch.setenv('GETH_BASE_INSTALL_PATH', base_install_path)
# sanity check that it's not already installed.
executable_path = get_executable_path(version)
assert not os.path.exists(executable_path)
install_geth(identifier=version, platform=platform)
assert os.path.exists(executable_path)
monkeypatch.setenv('GETH_BINARY', executable_path)
actual_version = get_geth_version()
expected_version = semantic_version.Spec(version.lstrip('v'))
assert actual_version in expected_version
def test_unsigned_to_eip155_signed_transaction(txn_fixture, transaction_class):
if txn_fixture['chainId'] is None:
pytest.skip('No chain id for EIP155 signing')
elif not hasattr(transaction_class, 'chain_id'):
pytest.skip('Transaction class is not chain aware')
key = keys.PrivateKey(decode_hex(txn_fixture['key']))
unsigned_txn = transaction_class.create_unsigned_transaction(
nonce=txn_fixture['nonce'],
gas_price=txn_fixture['gasPrice'],
gas=txn_fixture['gas'],
to=(
to_canonical_address(txn_fixture['to'])
if txn_fixture['to']
else b''
),
value=txn_fixture['value'],
data=decode_hex(txn_fixture['data']),
)
signed_txn = unsigned_txn.as_signed_transaction(key, chain_id=txn_fixture['chainId'])
assert is_same_address(signed_txn.sender, key.public_key.to_canonical_address())
assert signed_txn.chain_id == txn_fixture['chainId']