def test_importlib_importmodule_relative_ns_subpkg(self):
"""Verify that package is importable relatively"""
print_importers()
assert __package__
# import_module checks sys.modules by itself
# but the test is not reflecting anything if we use the already loaded module.
if sys.modules.get(__package__ + '.nspkg.subpkg'):
raise unittest.SkipTest("module previously loaded".format(__package__ + '.nspkg.subpkg'))
else:
test_pkg = importlib.import_module('.nspkg.subpkg', package=__package__)
self.assertTrue(test_pkg is not None)
self.assertTrue(test_pkg.TestClassInSubPkg is not None)
self.assertTrue(callable(test_pkg.TestClassInSubPkg))
# TODO : implement some differences and check we get them...
if hasattr(importlib, 'reload'): # recent version of importlib
# attempting to reload
importlib.reload(test_pkg)
else:
pass
python类SkipTest()的实例源码
test_filefinder2_importlib_importmodule.py 文件源码
项目:filefinder2
作者: asmodehn
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
test_filefinder2_importlib_importmodule.py 文件源码
项目:filefinder2
作者: asmodehn
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def test_importlib_importmodule_relative_ns_subpkg_bytecode(self):
"""Verify that package is importable relatively"""
print_importers()
assert __package__
# import_module checks sys.modules by itself
# but the test is not reflecting anything if we use the already loaded module.
if sys.modules.get(__package__ + '.nspkg.subpkg.bytecode'):
raise unittest.SkipTest("module previously loaded".format(__package__ + '.nspkg.subpkg.bytecode'))
else:
test_mod = importlib.import_module('.nspkg.subpkg.bytecode', package=__package__)
self.assertTrue(test_mod is not None)
self.assertTrue(test_mod.TestClassInBytecode is not None)
self.assertTrue(callable(test_mod.TestClassInBytecode))
# TODO : implement some differences and check we get them...
if hasattr(importlib, 'reload'): # recent version of importlib
# attempting to reload
importlib.reload(test_mod)
else:
pass
test_filefinder2_importlib_importmodule.py 文件源码
项目:filefinder2
作者: asmodehn
项目源码
文件源码
阅读 14
收藏 0
点赞 0
评论 0
def test_importlib_importmodule_class_from_relative_ns_subpkg(self):
"""Verify that test class is importable relatively"""
print_importers()
assert __package__
# import_module checks sys.modules by itself
# but the test is not reflecting anything if we use the already loaded module.
if sys.modules.get(__package__ + '.nspkg.subpkg'):
raise unittest.SkipTest("module previously loaded".format(__package__ + '.nspkg.subpkg'))
else:
nspkg_subpkg = importlib.import_module('.nspkg.subpkg', package=__package__)
test_class_in_subpkg = nspkg_subpkg.TestClassInSubPkg
self.assertTrue(test_class_in_subpkg is not None)
self.assertTrue(callable(test_class_in_subpkg))
# TODO : implement some differences and check we get them...
if hasattr(importlib, 'reload'): # recent version of importlib
# attempting to reload
importlib.reload(nspkg_subpkg)
else:
pass
test_filefinder2_importlib_importmodule.py 文件源码
项目:filefinder2
作者: asmodehn
项目源码
文件源码
阅读 15
收藏 0
点赞 0
评论 0
def test_importlib_importmodule_class_from_relative_ns_subpkg_submodule(self):
"""Verify that test class is importable relatively"""
print_importers()
assert __package__
# import_module checks sys.modules by itself
# but the test is not reflecting anything if we use the already loaded module.
if sys.modules.get(__package__ + '.nspkg.subpkg.submodule'):
raise unittest.SkipTest("module previously loaded".format(__package__ + '.nspkg.subpkg.submodule'))
else:
nspkg_subpkg_submodule = importlib.import_module('.nspkg.subpkg.submodule', package=__package__)
test_class_in_submodule = nspkg_subpkg_submodule.TestClassInSubModule
self.assertTrue(test_class_in_submodule is not None)
self.assertTrue(callable(test_class_in_submodule))
# TODO : implement some differences and check we get them...
if hasattr(importlib, 'reload'): # recent version of importlib
# attempting to reload
importlib.reload(nspkg_subpkg_submodule)
else:
pass
test_filefinder2_importlib_importmodule.py 文件源码
项目:filefinder2
作者: asmodehn
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def test_importlib_importmodule_class_from_relative_ns_subpkg_bytecode(self):
"""Verify that test class is importable relatively"""
print_importers()
assert __package__
# import_module checks sys.modules by itself
# but the test is not reflecting anything if we use the already loaded module.
if sys.modules.get(__package__ + '.nspkg.subpkg.bytecode'):
raise unittest.SkipTest("module previously loaded".format(__package__ + '.nspkg.subpkg.bytecode'))
else:
nspkg_subpkg_bytecode = importlib.import_module('.nspkg.subpkg.bytecode', package=__package__)
test_class_in_bytecode = nspkg_subpkg_bytecode.TestClassInBytecode
self.assertTrue(test_class_in_bytecode is not None)
self.assertTrue(callable(test_class_in_bytecode))
# TODO : implement some differences and check we get them...
if hasattr(importlib, 'reload'): # recent version of importlib
# attempting to reload
importlib.reload(nspkg_subpkg_bytecode)
else:
pass
def __init__(self,
conf_template,
udp_enabled=False):
if os.environ.get("INFLUXDB_PYTHON_SKIP_SERVER_TESTS", None) == 'True':
raise unittest.SkipTest(
"Skipping server test (INFLUXDB_PYTHON_SKIP_SERVER_TESTS)"
)
self.influxd_path = self.find_influxd_path()
errors = 0
while True:
try:
self._start_server(conf_template, udp_enabled)
break
# Happens when the ports are already in use.
except RuntimeError as e:
errors += 1
if errors > 2:
raise e
def find_influxd_path(self):
influxdb_bin_path = os.environ.get(
'INFLUXDB_PYTHON_INFLUXD_PATH',
None
)
if influxdb_bin_path is None:
influxdb_bin_path = distutils.spawn.find_executable('influxd')
if not influxdb_bin_path:
try:
influxdb_bin_path = subprocess.check_output(
['which', 'influxd']
).strip()
except subprocess.CalledProcessError:
# fallback on :
influxdb_bin_path = '/opt/influxdb/influxd'
if not os.path.isfile(influxdb_bin_path):
raise unittest.SkipTest("Could not find influxd binary")
version = subprocess.check_output([influxdb_bin_path, 'version'])
print("InfluxDB version: %s" % version, file=sys.stderr)
return influxdb_bin_path
def test_is_ancestor(self):
git = self.rorepo.git
if git.version_info[:3] < (1, 8, 0):
raise SkipTest("git merge-base --is-ancestor feature unsupported")
repo = self.rorepo
c1 = 'f6aa8d1'
c2 = '763ef75'
self.assertTrue(repo.is_ancestor(c1, c1))
self.assertTrue(repo.is_ancestor("master", "master"))
self.assertTrue(repo.is_ancestor(c1, c2))
self.assertTrue(repo.is_ancestor(c1, "master"))
self.assertFalse(repo.is_ancestor(c2, c1))
self.assertFalse(repo.is_ancestor("master", c1))
for i, j in itertools.permutations([c1, 'ffffff', ''], r=2):
self.assertRaises(GitCommandError, repo.is_ancestor, i, j)
def test_work_tree_unsupported(self, rw_dir):
git = Git(rw_dir)
if git.version_info[:3] < (2, 5, 1):
raise SkipTest("worktree feature unsupported")
rw_master = self.rorepo.clone(join_path_native(rw_dir, 'master_repo'))
rw_master.git.checkout('HEAD~10')
worktree_path = join_path_native(rw_dir, 'worktree_repo')
if Git.is_cygwin():
worktree_path = cygpath(worktree_path)
try:
rw_master.git.worktree('add', worktree_path, 'master')
except Exception as ex:
raise AssertionError(ex, "It's ok if TC not running from `master`.")
self.failUnlessRaises(InvalidGitRepositoryError, Repo, worktree_path)
disabled_test_heartbeat.py 文件源码
项目:integration-prototype
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 14
收藏 0
点赞 0
评论 0
def setUpClass(cls):
""" Initialise the test class.
"""
warnings.simplefilter('ignore', ResourceWarning)
# Get a client to the local docker engine.
client = docker.from_env()
# Skip the tests in this class if not running from a manager node.
if not client.info()['Swarm']['ControlAvailable']:
raise unittest.SkipTest('This test must be run from a swarm '
'manager node.')
# client.swarm.init()
# Create a logging server
# FIXME(BM): This should not be needed to test heartbeat!
paas = Paas()
cls.logger = paas.run_service(
'logging_server',
'sip',
[logging.handlers.DEFAULT_TCP_LOGGING_PORT],
['python3', 'sip/common/logging_server.py'])
# Wait for the logging server to come online.
time.sleep(3)
test_paas_spark.py 文件源码
项目:integration-prototype
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 14
收藏 0
点赞 0
评论 0
def setUpClass(cls):
""" Set up test fixture.
Skip the tests if it is not possible to connect to the Spark Master.
"""
paas = SparkPaaS()
try:
master_url = "http://{}:{}".format(
paas.spark_master['url'], paas.spark_master['master_port'])
req = requests.get(master_url)
if not req.ok:
raise unittest.SkipTest('No 200 response from Spark Master '
'@[{}], Skipping tests.'.
format(master_url))
except requests.exceptions.RequestException:
print('Exception thrown..')
raise unittest.SkipTest("Cannot connect to Spark Master @ [{}]. "
"Skipping tests.". format(master_url))
cls.paas = paas
def test_181_execute_gte_v10(self):
if self.client.server_version < pkg_resources.parse_version('10.0'):
raise unittest.SkipTest(
'Not applicable to Odoo version less then 10.0')
res = self.client.execute('res.partner', 'read', 1)
self.assertIsInstance(res, list)
self.assertEqual(len(res), 1)
self.assertIsInstance(res[0], dict)
self.assertEqual(res[0]['id'], 1)
res = self.client.execute('res.partner', 'read', [1])
self.assertIsInstance(res, list)
self.assertEqual(len(res), 1)
self.assertIsInstance(res[0], dict)
self.assertEqual(res[0]['id'], 1)
def requires(resource, msg=None):
"""Raise ResourceDenied if the specified resource is not available.
If the caller's module is __main__ then automatically return True. The
possibility of False being returned occurs when regrtest.py is
executing.
"""
if resource == 'gui' and not _is_gui_available():
raise unittest.SkipTest("Cannot use the 'gui' resource")
# see if the caller's module is __main__ - if so, treat as if
# the resource was set
if sys._getframe(1).f_globals.get("__name__") == "__main__":
return
if not is_resource_enabled(resource):
if msg is None:
msg = "Use of the %r resource not enabled" % resource
raise ResourceDenied(msg)
def bigaddrspacetest(f):
"""Decorator for tests that fill the address space."""
def wrapper(self):
if max_memuse < MAX_Py_ssize_t:
if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
raise unittest.SkipTest(
"not enough memory: try a 32-bit build instead")
else:
raise unittest.SkipTest(
"not enough memory: %.1fG minimum needed"
% (MAX_Py_ssize_t / (1024 ** 3)))
else:
return f(self)
return wrapper
#=======================================================================
# unittest integration.
def test_setup_class_install_environment_predefined_no_dir(self):
from calmjs.cli import PackageManagerDriver
from calmjs import cli
utils.stub_os_environ(self)
utils.stub_mod_call(self, cli)
cwd = mkdtemp(self)
# we have the mock_tempfile context...
self.assertEqual(self.mock_tempfile.count, 1)
os.chdir(cwd)
# a very common use case
os.environ['CALMJS_TEST_ENV'] = '.'
TestCase = type('TestCase', (unittest.TestCase,), {})
# the directory not there.
with self.assertRaises(unittest.SkipTest):
utils.setup_class_install_environment(
TestCase, PackageManagerDriver, [])
# temporary directory should not be created as the skip will
# also stop the teardown from running
self.assertEqual(self.mock_tempfile.count, 1)
# this is still set, but irrelevant.
self.assertEqual(TestCase._env_root, cwd)
# tmpdir not set.
self.assertFalse(hasattr(TestCase, '_cls_tmpdir'))
def test_linear_regression_numpy(self):
""" Test the linear regression using numpy (if installed). """
# test that numpy is installed
try:
import numpy
numpy.__name__
except ImportError:
raise unittest.SkipTest('cannot test as optional numpy is not installed')
# perform the test with numpy
from supvisors.utils import get_linear_regression, get_simple_linear_regression
xdata = [2, 4, 6, 8, 10, 12]
ydata = [3, 4, 5, 6, 7, 8]
# test linear regression
a, b = get_linear_regression(xdata, ydata)
self.assertAlmostEqual(0.5, a)
self.assertAlmostEqual(2.0, b)
# test simple linear regression
a, b = get_simple_linear_regression(ydata)
self.assertAlmostEqual(1.0, a)
self.assertAlmostEqual(3.0, b)
def test_ipv4(self):
""" Test the ipv4 method. """
# complex to test as it depends on the network configuration of the operating system
# check that there is at least one entry looking like an IP address
from supvisors.addressmapper import AddressMapper
# test that netifaces is installed
try:
import netifaces
netifaces.__name__
except ImportError:
raise unittest.SkipTest('cannot test as optional netifaces is not installed')
# test function
ip_list = AddressMapper.ipv4()
self.assertTrue(ip_list)
for ip in ip_list:
self.assertRegexpMatches(ip, r'^\d{1,3}(.\d{1,3}){3}$')
def skip(reason):
"""A decorator to unconditionally skip a test:
.. code-block:: python
@datatest.skip('Not finished collecting raw data.')
class TestSumTotals(datatest.DataTestCase):
def test_totals(self):
...
"""
def decorator(test_item):
if not isinstance(test_item, type):
orig_item = test_item # <- Not in unittest.skip()
@functools.wraps(test_item)
def skip_wrapper(*args, **kwargs):
raise unittest.SkipTest(reason)
test_item = skip_wrapper
test_item._wrapped = orig_item # <- Not in unittest.skip()
test_item.__unittest_skip__ = True
test_item.__unittest_skip_why__ = reason
return test_item
return decorator
def test_skiptest_in_setupclass(self):
class Test(unittest.TestCase):
@classmethod
def setUpClass(cls):
raise unittest.SkipTest('foo')
def test_one(self):
pass
def test_two(self):
pass
result = self.runTests(Test)
self.assertEqual(result.testsRun, 0)
self.assertEqual(len(result.errors), 0)
self.assertEqual(len(result.skipped), 1)
skipped = result.skipped[0][0]
self.assertEqual(str(skipped), 'setUpClass (%s.Test)' % __name__)
def test_skiptest_in_setupmodule(self):
class Test(unittest.TestCase):
def test_one(self):
pass
def test_two(self):
pass
class Module(object):
@staticmethod
def setUpModule():
raise unittest.SkipTest('foo')
Test.__module__ = 'Module'
sys.modules['Module'] = Module
result = self.runTests(Test)
self.assertEqual(result.testsRun, 0)
self.assertEqual(len(result.errors), 0)
self.assertEqual(len(result.skipped), 1)
skipped = result.skipped[0][0]
self.assertEqual(str(skipped), 'setUpModule (Module)')