def test_runtime_main_with_broken_runtime(self):
stub_stdouts(self)
working_set = mocks.WorkingSet({'calmjs.runtime': [
'broken = calmjs.tests.test_runtime:broken',
]})
with self.assertRaises(SystemExit):
runtime.main(
['-vvd', '-h'],
runtime_cls=lambda: runtime.Runtime(working_set=working_set)
)
out = sys.stdout.getvalue()
err = sys.stderr.getvalue()
self.assertIn('broken', err)
self.assertIn('Traceback', err)
self.assertIn('a fake import error', err)
self.assertNotIn('broken', out)
python类WorkingSet()的实例源码
def test_runtime_group_not_runtime_reported(self):
stub_stdouts(self)
make_dummy_dist(self, ((
'entry_points.txt',
'[calmjs.runtime]\n'
'bs = calmjs.testing.module3.runtime:fake_bootstrap\n'
),), 'example.package', '1.0')
working_set = pkg_resources.WorkingSet([self._calmjs_testing_tmpdir])
with self.assertRaises(SystemExit):
runtime.main(
['-h'],
runtime_cls=lambda: runtime.Runtime(working_set=working_set)
)
self.assertIn(
"'calmjs.runtime' entry point "
"'bs = calmjs.testing.module3.runtime:fake_bootstrap' from "
"'example.package 1.0' invalid for instance of "
"'calmjs.runtime.Runtime': target not an instance of "
"'calmjs.runtime.BaseRuntime' or its subclass; not registering "
"invalid entry point", sys.stderr.getvalue()
)
def test_wrong_registry_type(self):
advice = AdviceRegistry('adv', _working_set=WorkingSet({}))
registries = {'adv': advice}
stub_item_attr_value(
self, calmjs_toolchain, 'get_registry', registries.get)
spec = {'calmjs_loaderplugin_registry_name': 'adv'}
with pretty_logging(stream=StringIO()) as s:
registry = spec_update_loaderplugin_registry(spec)
self.assertIn(
"object referenced in spec is not a valid", s.getvalue())
self.assertIsNot(registry, advice)
self.assertTrue(isinstance(registry, BaseLoaderPluginRegistry))
spec = {}
with pretty_logging(stream=StringIO()) as s:
registry = spec_update_loaderplugin_registry(spec, default='adv')
self.assertIn(
"provided default is not a valid loaderplugin registry",
s.getvalue())
self.assertIsNot(registry, advice)
self.assertTrue(isinstance(registry, BaseLoaderPluginRegistry))
def test_resolve_and_order(self):
fake = LoaderPluginRegistry('fake_registry', _working_set=WorkingSet({
'fake_registry': [
'foo = calmjs.tests.test_toolchain:MockLPHandler']}))
registries = {'fake_registry': fake}
stub_item_attr_value(
self, calmjs_toolchain, 'get_registry', registries.get)
spec = {'calmjs_loaderplugin_registry_name': 'fake_registry'}
with pretty_logging(stream=StringIO()) as s:
registry = spec_update_loaderplugin_registry(spec)
self.assertIn(
"using loaderplugin registry 'fake_registry'", s.getvalue())
self.assertIs(registry, fake)
spec = {
'calmjs_loaderplugin_registry_name': 'fake_registry',
'calmjs_loaderplugin_registry': BaseLoaderPluginRegistry('raw'),
}
def test_standard_toolchain_process(self):
make_dummy_dist(self, ((
'entry_points.txt',
'[calmjs.toolchain.advice]\n'
'calmjs.toolchain:Toolchain = calmjs.tests.test_toolchain:dummy\n'
),), 'example.package', '1.0')
working_set = pkg_resources.WorkingSet([self._calmjs_testing_tmpdir])
reg = AdviceRegistry(CALMJS_TOOLCHAIN_ADVICE, _working_set=working_set)
toolchain = Toolchain()
spec = Spec()
with pretty_logging(stream=StringIO()) as s:
reg.process_toolchain_spec_package(
toolchain, spec, 'example.package')
self.assertEqual(spec, {'dummy': ['dummy']})
self.assertIn(
"found advice setup steps registered for package/requirement "
"'example.package' for toolchain 'calmjs.toolchain:Toolchain'",
s.getvalue(),
)
def test_standard_toolchain_advice_extras(self):
make_dummy_dist(self, ((
'entry_points.txt',
'[calmjs.toolchain.advice]\n'
'calmjs.toolchain:NullToolchain = '
'calmjs.tests.test_toolchain:dummy\n'
),), 'example.package', '1.0')
working_set = pkg_resources.WorkingSet([self._calmjs_testing_tmpdir])
reg = AdviceRegistry(CALMJS_TOOLCHAIN_ADVICE, _working_set=working_set)
toolchain = NullToolchain()
spec = Spec()
with pretty_logging(stream=StringIO()) as s:
reg.process_toolchain_spec_package(
toolchain, spec, 'example.package[a,bc,d]')
self.assertEqual(spec['extras'], ['a', 'bc', 'd'])
self.assertIn(
"found advice setup steps registered for package/requirement "
"'example.package[a,bc,d]' for toolchain ", s.getvalue()
)
def test_toolchain_spec_prepare_loaderplugins_standard(self):
reg = LoaderPluginRegistry('simple', _working_set=WorkingSet({
'simple': [
'foo = calmjs.tests.test_toolchain:MockLPHandler',
'bar = calmjs.tests.test_toolchain:MockLPHandler',
],
}))
spec = Spec(
calmjs_loaderplugin_registry=reg,
loaderplugin_sourcepath_maps={
'foo': {'foo!thing': 'thing'},
'bar': {'bar!thing': 'thing'},
},
)
toolchain_spec_prepare_loaderplugins(
self.toolchain, spec, 'loaderplugin', 'loaders')
self.assertEqual({
'foo!thing': 'thing',
'bar!thing': 'thing',
}, spec['loaderplugin_sourcepath'])
self.assertEqual({
'foo': 'foo',
'bar': 'bar',
}, spec['loaders'])
def test_mismatched_ns(self):
# mismatch includes a package that doesn't actually have the
# directory created
d_egg_root = join(mkdtemp(self), 'dummyns')
make_dummy_dist(self, ((
'namespace_packages.txt',
'not_ns\n',
), (
'entry_points.txt',
'[dummyns]\n'
'dummyns = dummyns:attr\n',
),), 'dummyns', '1.0', working_dir=d_egg_root)
working_set = pkg_resources.WorkingSet([
d_egg_root,
self.ds_egg_root,
])
stub_item_attr_value(self, pkg_resources, 'working_set', working_set)
dummyns_ep = next(working_set.iter_entry_points('dummyns'))
p = indexer.resource_filename_mod_entry_point('dummyns', dummyns_ep)
self.assertEqual(normcase(p), normcase(self.dummyns_path))
def test_not_namespace(self):
d_egg_root = join(mkdtemp(self), 'dummyns')
make_dummy_dist(self, ((
'entry_points.txt',
'[dummyns]\n'
'dummyns = dummyns:attr\n',
),), 'dummyns', '1.0', working_dir=d_egg_root)
working_set = pkg_resources.WorkingSet([
d_egg_root,
self.ds_egg_root,
])
stub_item_attr_value(self, pkg_resources, 'working_set', working_set)
moddir = join(d_egg_root, 'dummyns')
os.makedirs(moddir)
dummyns_ep = next(working_set.iter_entry_points('dummyns'))
p = indexer.resource_filename_mod_entry_point('dummyns', dummyns_ep)
self.assertEqual(normcase(p), normcase(self.dummyns_path))
def test_nested_namespace(self):
self.called = None
def _exists(p):
self.called = p
return exists(p)
working_set = pkg_resources.WorkingSet([
self.ds_egg_root,
])
stub_item_attr_value(self, pkg_resources, 'working_set', working_set)
stub_item_attr_value(self, indexer, 'exists', _exists)
dummyns_ep = next(working_set.iter_entry_points('dummyns.submod'))
p = indexer.resource_filename_mod_entry_point(
'dummyns.submod', dummyns_ep)
self.assertEqual(p, self.called)
with open(join(p, 'data.txt')) as fd:
data = fd.read()
self.assertEqual(data, self.nested_data)
def setUp(self):
remember_cwd(self)
app = make_dummy_dist(self, (
('requires.txt', '\n'.join([])),
('package.json', json.dumps({
'dependencies': {'jquery': '~1.11.0'},
})),
), 'foo', '1.9.0')
working_set = WorkingSet()
working_set.add(app, self._calmjs_testing_tmpdir)
# Stub out the flatten_egginfo_json calls with one that uses our
# custom working_set here.
stub_item_attr_value(self, dist, 'default_working_set', working_set)
# Quiet stdout from distutils logs
stub_stdouts(self)
# Force auto-detected interactive mode to True, because this is
# typically executed within an interactive context.
stub_check_interactive(self, True)
def setUp(self):
remember_cwd(self)
app = make_dummy_dist(self, (
('requires.txt', '\n'.join([])),
('package.json', json.dumps({
'dependencies': {'jquery': '~1.11.0'},
})),
), 'foo', '1.9.0')
working_set = WorkingSet()
working_set.add(app, self._calmjs_testing_tmpdir)
# Stub out the flatten_egginfo_json calls with one that uses our
# custom working_set here.
stub_item_attr_value(self, dist, 'default_working_set', working_set)
# Quiet stdout from distutils logs
stub_stdouts(self)
# Force auto-detected interactive mode to True, because this is
# typically executed within an interactive context.
stub_check_interactive(self, True)
def tests_flatten_egginfo_json_missing_deps(self):
"""
Missing dependencies should not cause a hard failure.
"""
make_dummy_dist(self, (
('requires.txt', '\n'.join([
'uilib>=1.0',
])),
), 'app', '2.0')
working_set = pkg_resources.WorkingSet([self._calmjs_testing_tmpdir])
# Python dependency acquisition failures should fail hard.
with self.assertRaises(pkg_resources.DistributionNotFound):
calmjs_dist.flatten_egginfo_json(['app'], working_set=working_set)
def _namespace_package_path(fqname, pathnames, path=None):
"""
Return the __path__ for the python package in *fqname*.
This function uses setuptools metadata to extract information
about namespace packages from installed eggs.
"""
working_set = pkg_resources.WorkingSet(path)
path = list(pathnames)
for dist in working_set:
if dist.has_metadata('namespace_packages.txt'):
namespaces = dist.get_metadata(
'namespace_packages.txt').splitlines()
if fqname in namespaces:
nspath = os.path.join(dist.location, *fqname.split('.'))
if nspath not in path:
path.append(nspath)
return path
def _namespace_package_path(fqname, pathnames, path=None):
"""
Return the __path__ for the python package in *fqname*.
This function uses setuptools metadata to extract information
about namespace packages from installed eggs.
"""
working_set = pkg_resources.WorkingSet(path)
path = list(pathnames)
for dist in working_set:
if dist.has_metadata('namespace_packages.txt'):
namespaces = dist.get_metadata(
'namespace_packages.txt').splitlines()
if fqname in namespaces:
nspath = os.path.join(dist.location, *fqname.split('.'))
if nspath not in path:
path.append(nspath)
return path
def get_all_entry_points():
"""
Get all entry points related to ``ros2cli`` and any of its extensions.
:returns: mapping of entry point names to ``EntryPoint`` instances
:rtype: dict
"""
extension_points = get_entry_points(EXTENSION_POINT_GROUP_NAME)
entry_points = defaultdict(dict)
working_set = WorkingSet()
for dist in sorted(working_set):
entry_map = dist.get_entry_map()
for group_name in entry_map.keys():
# skip groups which are not registered as extension points
if group_name not in extension_points:
continue
group = entry_map[group_name]
for entry_point_name, entry_point in group.items():
entry_points[group_name][entry_point_name] = \
(dist, entry_point)
return entry_points
def check(request):
return [
{'name': distribution.project_name, 'version': distribution.version}
for distribution in WorkingSet()
]
def check(request):
package_name = settings.HEARTBEAT.get('package_name')
if not package_name:
raise ImproperlyConfigured(
'Missing package_name key from heartbeat configuration')
sys_path_distros = WorkingSet()
package_req = Requirement.parse(package_name)
distro = sys_path_distros.find(package_req)
if not distro:
return dict(error='no distribution found for {}'.format(package_name))
return dict(name=distro.project_name, version=distro.version)
def _directory_import(self):
"""
Import astropy_helpers from the given path, which will be added to
sys.path.
Must return True if the import succeeded, and False otherwise.
"""
# Return True on success, False on failure but download is allowed, and
# otherwise raise SystemExit
path = os.path.abspath(self.path)
# Use an empty WorkingSet rather than the man
# pkg_resources.working_set, since on older versions of setuptools this
# will invoke a VersionConflict when trying to install an upgrade
ws = pkg_resources.WorkingSet([])
ws.add_entry(path)
dist = ws.by_key.get(DIST_NAME)
if dist is None:
# We didn't find an egg-info/dist-info in the given path, but if a
# setup.py exists we can generate it
setup_py = os.path.join(path, 'setup.py')
if os.path.isfile(setup_py):
with _silence():
run_setup(os.path.join(path, 'setup.py'),
['egg_info'])
for dist in pkg_resources.find_distributions(path, True):
# There should be only one...
return dist
return dist
def _directory_import(self):
"""
Import astropy_helpers from the given path, which will be added to
sys.path.
Must return True if the import succeeded, and False otherwise.
"""
# Return True on success, False on failure but download is allowed, and
# otherwise raise SystemExit
path = os.path.abspath(self.path)
# Use an empty WorkingSet rather than the man
# pkg_resources.working_set, since on older versions of setuptools this
# will invoke a VersionConflict when trying to install an upgrade
ws = pkg_resources.WorkingSet([])
ws.add_entry(path)
dist = ws.by_key.get(DIST_NAME)
if dist is None:
# We didn't find an egg-info/dist-info in the given path, but if a
# setup.py exists we can generate it
setup_py = os.path.join(path, 'setup.py')
if os.path.isfile(setup_py):
with _silence():
run_setup(os.path.join(path, 'setup.py'),
['egg_info'])
for dist in pkg_resources.find_distributions(path, True):
# There should be only one...
return dist
return dist
def with_requires(*requirements):
"""Run a test case only when given requirements are satisfied.
.. admonition:: Example
This test case runs only when `numpy>=1.10` is installed.
>>> from cupy import testing
... class Test(unittest.TestCase):
... @testing.with_requires('numpy>=1.10')
... def test_for_numpy_1_10(self):
... pass
Args:
requirements: A list of string representing requirement condition to
run a given test case.
"""
ws = pkg_resources.WorkingSet()
try:
ws.require(*requirements)
skip = False
except pkg_resources.ResolutionError:
skip = True
msg = 'requires: {}'.format(','.join(requirements))
return unittest.skipIf(skip, msg)
def _directory_import(self):
"""
Import astropy_helpers from the given path, which will be added to
sys.path.
Must return True if the import succeeded, and False otherwise.
"""
# Return True on success, False on failure but download is allowed, and
# otherwise raise SystemExit
path = os.path.abspath(self.path)
# Use an empty WorkingSet rather than the man
# pkg_resources.working_set, since on older versions of setuptools this
# will invoke a VersionConflict when trying to install an upgrade
ws = pkg_resources.WorkingSet([])
ws.add_entry(path)
dist = ws.by_key.get(DIST_NAME)
if dist is None:
# We didn't find an egg-info/dist-info in the given path, but if a
# setup.py exists we can generate it
setup_py = os.path.join(path, 'setup.py')
if os.path.isfile(setup_py):
with _silence():
run_setup(os.path.join(path, 'setup.py'),
['egg_info'])
for dist in pkg_resources.find_distributions(path, True):
# There should be only one...
return dist
return dist
def test_runtime_entry_point_broken_at_main(self):
# try the above, but do this through main
stub_stdouts(self)
ep = pkg_resources.EntryPoint.parse('broken = some.broken:instance')
ep.load = fake_error(ImportError)
working_set = mocks.WorkingSet({'calmjs.runtime': [ep]})
with self.assertRaises(SystemExit):
runtime.main(
['-h'],
runtime_cls=lambda: runtime.Runtime(working_set=working_set)
)
out = sys.stdout.getvalue()
err = sys.stderr.getvalue()
self.assertNotIn('broken', out)
self.assertIn('broken', err)
def test_runtime_entry_point_preparse_warning(self):
# see next test for the condition for warning to appear.
stub_stdouts(self)
working_set = mocks.WorkingSet({'calmjs.runtime': [
'deprecated = calmjs.tests.test_runtime:deprecated',
]})
with self.assertRaises(SystemExit):
runtime.main(
['deprecated'],
runtime_cls=lambda: runtime.Runtime(working_set=working_set)
)
err = sys.stderr.getvalue()
self.assertNotIn('Traceback', err)
self.assertNotIn('this runtime is deprecated', err)
def test_runtime_entry_point_preparse_warning_verbose_logged(self):
stub_stdouts(self)
working_set = mocks.WorkingSet({'calmjs.runtime': [
'deprecated = calmjs.tests.test_runtime:deprecated',
]})
with self.assertRaises(SystemExit):
# use the verbose flag to increase the log level
runtime.main(
['-v', 'deprecated'],
runtime_cls=lambda: runtime.Runtime(working_set=working_set)
)
err = sys.stderr.getvalue()
self.assertNotIn('Traceback', err)
self.assertIn('this runtime is deprecated', err)
self.assertNotIn('DeprecationWarning triggered at', err)
def test_runtime_entry_point_preparse_warning_verbose_debug_logged(self):
stub_stdouts(self)
working_set = mocks.WorkingSet({'calmjs.runtime': [
'deprecated = calmjs.tests.test_runtime:deprecated',
]})
with self.assertRaises(SystemExit):
# use the verbose flag to increase the log level
runtime.main(
['-vvv', 'deprecated'],
runtime_cls=lambda: runtime.Runtime(working_set=working_set)
)
err = sys.stderr.getvalue()
self.assertNotIn('Traceback', err)
self.assertIn('this runtime is deprecated', err)
self.assertIn('DeprecationWarning triggered at', err)
def test_spec_debugged_via_cmdline(self):
stub_item_attr_value(
self, mocks, 'dummy',
runtime.ToolchainRuntime(toolchain.NullToolchain()),
)
working_set = mocks.WorkingSet({
'calmjs.runtime': [
'tool = calmjs.testing.mocks:dummy',
],
})
rt = runtime.Runtime(working_set=working_set, prog='calmjs')
result = rt(['tool', '--export-target', 'dummy', '-d'])
self.assertEqual(result['debug'], 1)
def test_spec_optional_advice(self):
from calmjs.registry import _inst as root_registry
key = toolchain.CALMJS_TOOLCHAIN_ADVICE
stub_stdouts(self)
def cleanup_fake_registry():
# pop out the fake advice registry that was added.
root_registry.records.pop(key, None)
self.addCleanup(cleanup_fake_registry)
make_dummy_dist(self, ((
'entry_points.txt',
'[calmjs.toolchain.advice]\n'
'calmjs.toolchain:Toolchain = calmjs.tests.test_toolchain:bad\n'
'calmjs.toolchain:NullToolchain = '
'calmjs.tests.test_toolchain:dummy\n'
),), 'example.package', '1.0')
working_set = pkg_resources.WorkingSet([self._calmjs_testing_tmpdir])
root_registry.records[key] = toolchain.AdviceRegistry(
key, _working_set=working_set)
tc = toolchain.NullToolchain()
rt = runtime.ToolchainRuntime(tc)
result = rt([
'--export-target', 'dummy',
'--optional-advice', 'example.package', '-vv',
])
self.assertEqual(result['dummy'], ['dummy', 'bad'])
err = sys.stderr.getvalue()
self.assertIn('prepare spec with optional advices from packages', err)
self.assertIn('example.package', err)
self.assertIn('failure encountered while setting up advices', err)
# Doing it normally should not result in that optional key.
result = rt(['--export-target', 'dummy'])
self.assertNotIn('dummy', result)
def test_spec_optional_advice_extras(self):
from calmjs.registry import _inst as root_registry
key = toolchain.CALMJS_TOOLCHAIN_ADVICE
stub_stdouts(self)
def cleanup_fake_registry():
# pop out the fake advice registry that was added.
root_registry.records.pop(key, None)
self.addCleanup(cleanup_fake_registry)
make_dummy_dist(self, ((
'entry_points.txt',
'[calmjs.toolchain.advice]\n'
'calmjs.toolchain:NullToolchain = '
'calmjs.tests.test_toolchain:dummy\n'
),), 'example.package', '1.0')
working_set = pkg_resources.WorkingSet([self._calmjs_testing_tmpdir])
root_registry.records[key] = toolchain.AdviceRegistry(
key, _working_set=working_set)
tc = toolchain.NullToolchain()
rt = runtime.ToolchainRuntime(tc)
result = rt([
'--export-target', 'dummy',
'--optional-advice', 'example.package[extra1,extra2]', '-vv',
])
self.assertEqual(result['extras'], ['extra1', 'extra2'])
def test_spec_deferred_addition(self):
"""
This turns out to be critical - the advices provided by the
packages should NOT be added immediately, as it is executed
before a number of very important advices were added by the
toolchain itself.
"""
from calmjs.registry import _inst as root_registry
key = toolchain.CALMJS_TOOLCHAIN_ADVICE
stub_stdouts(self)
def cleanup_fake_registry():
# pop out the fake advice registry that was added.
root_registry.records.pop(key, None)
self.addCleanup(cleanup_fake_registry)
make_dummy_dist(self, ((
'entry_points.txt',
'[calmjs.toolchain.advice]\n'
'calmjs.toolchain:NullToolchain = '
'calmjs.testing.spec:advice_order\n'
),), 'example.package', '1.0')
working_set = pkg_resources.WorkingSet([self._calmjs_testing_tmpdir])
root_registry.records[key] = toolchain.AdviceRegistry(
key, _working_set=working_set)
tc = toolchain.NullToolchain()
rt = runtime.ToolchainRuntime(tc)
result = rt([
'--export-target', 'dummy',
'--optional-advice', 'example.package',
])
self.assertEqual(sys.stderr.getvalue(), '')
self.assertIsNotNone(result)