def sources():
try:
sourceDict = []
for i in __all__:
for loader, module_name, is_pkg in pkgutil.walk_packages([os.path.join(os.path.dirname(__file__), i)]):
if is_pkg:
continue
try:
module = loader.find_module(module_name).load_module(module_name)
sourceDict.append((module_name, module.source()))
except Exception as e:
log_utils.log('Could not load "%s": %s' % (module_name, e), log_utils.LOGDEBUG)
return sourceDict
except:
return []
python类walk_packages()的实例源码
def main():
argv = [
"", "--with-coverage",
(
"--cover-package="
"girlfriend.workflow,"
"girlfriend.data,"
"girlfriend.util"
),
]
for importer, modname, ispkg in pkgutil.walk_packages(
path=girlfriend.testing.__path__,
prefix="girlfriend.testing."):
if ispkg:
continue
if modname in excludes:
continue
argv.append(modname)
nose.run(argv=argv)
def get_package_libraries(pkg):
"""
Recursively yield template tag libraries defined in submodules of a
package.
"""
for entry in walk_packages(pkg.__path__, pkg.__name__ + '.'):
try:
module = import_module(entry[1])
except ImportError as e:
raise InvalidTemplateLibrary(
"Invalid template library specified. ImportError raised when "
"trying to load '%s': %s" % (entry[1], e)
)
if hasattr(module, 'register'):
yield entry[1]
def import_submodules(package, recursive=True):
"""
Import all submodules of a module, recursively,
including subpackages.
From http://stackoverflow.com/questions/3365740/how-to-import-all-submodules
:param package: package (name or actual module)
:type package: str | module
:rtype: dict[str, types.ModuleType]
"""
import importlib
import pkgutil
if isinstance(package, str):
package = importlib.import_module(package)
results = {}
for _loader, name, is_pkg in pkgutil.walk_packages(package.__path__):
full_name = package.__name__ + '.' + name
results[full_name] = importlib.import_module(full_name)
if recursive and is_pkg:
results.update(import_submodules(full_name))
def init_drivers(self):
"""
Searchs and initializes all controller drivers.
See __init__.py in scc.drivers.
"""
log.debug("Initializing drivers...")
cfg = Config()
self._to_start = set() # del-eted later by start_drivers
for importer, modname, ispkg in pkgutil.walk_packages(path=drivers.__path__, onerror=lambda x: None):
if not ispkg and modname != "driver":
if modname == "usb" or cfg["drivers"].get(modname):
# 'usb' driver has to be always active
mod = getattr(__import__('scc.drivers.%s' % (modname,)).drivers, modname)
if hasattr(mod, "init"):
if getattr(mod, "init")(self, cfg):
if hasattr(mod, "start"):
self._to_start.add(getattr(mod, "start"))
else:
log.debug("Skipping disabled driver '%s'", modname)
def test_packages(self):
"""
Tests if every known Action is documentated in docs/actions.md
"""
try:
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('GdkX11', '3.0')
gi.require_version('Rsvg', '2.0')
except ImportError:
pass
from setup import packages
for importer, modname, ispkg in pkgutil.walk_packages(path=scc.__path__, prefix="scc.", onerror=lambda x: None):
if ispkg:
assert modname in packages, "Package '%s' is not being installed by setup.py" % (modname,)
def get_plugins(cls):
"""
Dynamically loads all the plugins in the plugins folder and sorts
them by the PRIORITY key. If no PRIORITY is defined for a given
plugin, a priority of 0 is assumed.
"""
logger = logging.getLogger(__name__)
locations = [
dingdangpath.PLUGIN_PATH,
dingdangpath.CONTRIB_PATH,
dingdangpath.CUSTOM_PATH
]
logger.debug("Looking for plugins in: %s",
', '.join(["'%s'" % location for location in locations]))
plugins = []
exclude_plugins = []
# plugins that are not allow to be call via Wechat or Email
thirdparty_exclude_plugins = ['NetEaseMusic']
for finder, name, ispkg in pkgutil.walk_packages(locations):
try:
loader = finder.find_module(name)
mod = loader.load_module(name)
except Exception:
logger.warning("Skipped plugin '%s' due to an error.", name,
exc_info=True)
else:
if hasattr(mod, 'WORDS'):
logger.debug("Found plugin '%s' with words: %r", name,
mod.WORDS)
plugins.append(mod)
if name in thirdparty_exclude_plugins:
exclude_plugins.append(mod)
else:
logger.warning("Skipped plugin '%s' because it misses " +
"the WORDS constant.", name)
plugins.sort(key=lambda mod: mod.PRIORITY if hasattr(mod, 'PRIORITY')
else 0, reverse=True)
return (plugins, exclude_plugins)
def import_submodules(package, recursive=True):
""" Import all submodules of a package, recursively, including subpackages
Arguments:
1. package = (string) name of the package
(module) loader of the package
2. recrusive = (bool) True = load packages and modules from all sub-packages as well.
(bool) False = load only first level of packages and modules, do not load modules from sub packages
"""
if isinstance(package, str):
package = importlib.import_module(package)
results = {}
for loader, name, is_pkg in pkgutil.walk_packages(package.__path__):
full_name = package.__name__ + '.' + name
results[full_name] = importlib.import_module(full_name)
if recursive and is_pkg:
results.update(import_submodules(full_name))
return results
def list_modules_in_package_path(self, package_path, prefix):
modules = []
path_to_walk = [(package_path, prefix)]
while len(path_to_walk) > 0:
curr_path, curr_prefix = path_to_walk.pop()
for importer, name, ispkg in pkgutil.walk_packages(
path=[curr_path]
):
if ispkg:
path_to_walk.append(
(
'%s/%s/' % (curr_path, name),
'%s.%s' % (curr_prefix, name)
)
)
else:
modules.append((name, '%s.%s' % (curr_prefix, name)))
return modules
def list_modules_in_package_path(self, package_path, prefix):
modules = []
path_to_walk = [(package_path, prefix)]
while len(path_to_walk) > 0:
curr_path, curr_prefix = path_to_walk.pop()
for importer, name, ispkg in pkgutil.walk_packages(
path=[curr_path]
):
if ispkg:
path_to_walk.append(
(
'%s/%s/' % (curr_path, name),
'%s.%s' % (curr_prefix, name)
)
)
else:
modules.append((name, '%s.%s' % (curr_prefix, name)))
return modules
def sources():
try:
sourceDict = []
for i in __all__:
for loader, module_name, is_pkg in pkgutil.walk_packages([os.path.join(os.path.dirname(__file__), i)]):
if is_pkg:
continue
try:
module = loader.find_module(module_name).load_module(module_name)
sourceDict.append((module_name, module.source()))
except Exception as e:
log_utils.log('Could not load "%s": %s' % (module_name, e), log_utils.LOGDEBUG)
return sourceDict
except:
return []
def sources():
try:
sourceDict = []
for i in __all__:
for loader, module_name, is_pkg in pkgutil.walk_packages([os.path.join(os.path.dirname(__file__), i)]):
if is_pkg:
continue
try:
module = loader.find_module(module_name).load_module(module_name)
sourceDict.append((module_name, module.source()))
except Exception as e:
log_utils.log('Could not load "%s": %s' % (module_name, e), log_utils.LOGDEBUG)
return sourceDict
except:
return []
def list_modules_in_package_path(package_path, prefix):
modules = []
path_to_walk = [(package_path, prefix)]
while len(path_to_walk) > 0:
curr_path, curr_prefix = path_to_walk.pop()
for importer, name, ispkg in pkgutil.walk_packages(
path=[curr_path]
):
if ispkg:
path_to_walk.append(
(
'%s/%s/' % (curr_path, name),
'%s.%s' % (curr_prefix, name)
)
)
else:
modules.append((name, '%s.%s' % (curr_prefix, name)))
return modules
def _load_from_package(self, pkg ):
'''???python???????
:param pkg: Python?
:type pkg: ModuleType
:returns list - ????????
'''
tests = []
for _, modulename, ispkg in pkgutil.walk_packages(pkg.__path__, pkg.__name__+'.', onerror=self._walk_package_error):
if ispkg:
continue
try:
__import__(modulename)
tests += self._load_from_module(sys.modules[modulename])
except:
self._module_errs[modulename] = traceback.format_exc()
return tests
def import_submodules(package, recursive=True):
"""
Import all submodules of a module, recursively, including subpackages
:param package: package (name or actual module)
:type package: str | module
:rtype: dict[str, types.ModuleType]
"""
if isinstance(package, str):
package = importlib.import_module(package)
results = {}
for loader, name, is_pkg in pkgutil.walk_packages(package.__path__):
full_name = package.__name__ + '.' + name
results[full_name] = importlib.import_module(full_name)
if recursive and is_pkg:
results.update(import_submodules(full_name))
return results
def import_submodules(package, recursive=True):
""" Import all submodules of a module, recursively, including subpackages
:param package: package (name or actual module)
:type package: str | module
:rtype: dict[str, types.ModuleType]
"""
if isinstance(package, str):
package = importlib.import_module(package)
results = {}
for loader, name, is_pkg in pkgutil.walk_packages(package.__path__):
full_name = package.__name__ + '.' + name
results[full_name] = importlib.import_module(full_name)
if recursive and is_pkg:
results.update(import_submodules(full_name))
return results
def get_package_libraries(pkg):
"""
Recursively yield template tag libraries defined in submodules of a
package.
"""
for entry in walk_packages(pkg.__path__, pkg.__name__ + '.'):
try:
module = import_module(entry[1])
except ImportError as e:
raise InvalidTemplateLibrary(
"Invalid template library specified. ImportError raised when "
"trying to load '%s': %s" % (entry[1], e)
)
if hasattr(module, 'register'):
yield entry[1]
def import_submodules(package, recursive=True):
"""Import all submodules of a module, recursively, including subpackages
:param package: package (name or actual module)
:type package: str | module
:rtype: dict[str, types.ModuleType]
"""
if isinstance(package, str):
package = importlib.import_module(package)
for loader, name, is_pkg in pkgutil.walk_packages(
package.__path__, package.__name__ + '.'):
if TEST_DIR in name:
continue
imported = importlib.import_module(name)
for prop in getattr(imported, "TRAITS", []):
symbolize(name, prop)
if recursive and is_pkg:
import_submodules(name)
# This is where the names defined in submodules are imported
def import_submodules(package, recursive=True):
"""Import all submodules of a module, recursively, including subpackages
Arguments:
* `package` -- package (name or actual module)
Keyword arguments:
* `recursive` -- import modules recursively
"""
if package is None:
return {}
if isinstance(package, str):
package = importlib.import_module(package)
importlib.reload(package)
results = {}
for loader, name, is_pkg in pkgutil.walk_packages(package.__path__):
full_name = package.__name__ + '.' + name
results[full_name] = importlib.import_module(full_name)
importlib.reload(results[full_name])
if recursive and is_pkg:
results.update(import_submodules(full_name))
return results
def import_submodules(context: dict, root_module: str, path: str):
"""
Import all submodules and register them in the ``context`` namespace.
>>> import_submodules(locals(), __name__, __path__)
"""
modules = {}
for loader, module_name, is_pkg in pkgutil.walk_packages(path, root_module + '.'):
# this causes a Runtime error with model conflicts
# module = loader.find_module(module_name).load_module(module_name)
module = __import__(module_name, globals(), locals(), ['__name__'])
keys = getattr(module, '__all__', None)
if keys is None:
keys = [k for k in vars(module).keys() if not k.startswith('_')]
for k in keys:
context[k] = getattr(module, k, None)
modules[module_name] = module
# maintain existing module namespace import with priority
for k, v in modules.items():
context[k] = v
def _compile_modules(self):
modules = {}
for module_finder, name, ispkg in pkgutil.walk_packages():
for attempt in range(2):
with suppress((AttributeError, ImportError)):
if attempt == 0:
loader = module_finder.find_spec(name).loader
filename = loader.get_filename(name)
elif attempt == 1:
filename = get_module_file_attribute(name)
break
else:
continue
modules[os.path.abspath(filename)] = name
return modules
def _load_all_functions(self):
if self.all_functions is None:
self.all_functions = dict()
my_package = '{0}/__init__.py'.format(os.path.dirname(__file__))
verbose_print('FunctionSupport loading all function modules near {0}.'.format(my_package),
indent_after=True)
for importer, modname, ispkg in walk_packages(my_package):
if not ispkg and modname.startswith('hq.hquery.functions.'):
verbose_print('Found candidate module {0} -- loading.'.format(modname))
module = importer.find_module(modname).load_module(modname)
if hasattr(module, 'exports'):
exports = {name.rstrip('_'): getattr(module, name) for name in getattr(module, 'exports')}
verbose_print('Module {0} exports are: {1}'.format(modname, exports.keys()))
if any(not (isclass(obj) or isfunction(obj)) for obj in exports.values()):
raise RuntimeError('Non-class/function export(s) loaded from module {0}'.format(modname))
self.all_functions.update(exports)
else:
verbose_print('Module {0} defined no exports.'.format(modname))
verbose_print('Finished loading function modules.', outdent_before=True)
def list_modules_in_package_path(self, package_path, prefix):
modules = []
path_to_walk = [(package_path, prefix)]
while len(path_to_walk) > 0:
curr_path, curr_prefix = path_to_walk.pop()
for importer, name, ispkg in pkgutil.walk_packages(
path=[curr_path]
):
if ispkg:
path_to_walk.append(
(
'%s/%s/' % (curr_path, name),
'%s.%s' % (curr_prefix, name)
)
)
else:
modules.append((name, '%s.%s' % (curr_prefix, name)))
return modules
def load_notifiers():
path = os.path.dirname(os.path.abspath(__file__))
before, sep, _ = __name__.rpartition('.')
prefix = before + sep
registry = {}
for _, modname, _ in pkgutil.walk_packages([path], prefix):
submodule = importlib.import_module(modname, __name__)
if hasattr(submodule, 'register'):
submodule.register(registry)
else:
key = getattr(submodule, 'NAME', modname.split('.')[-1])
if hasattr(submodule, 'notify_factory'):
registry[key] = submodule.notify_factory
elif hasattr(submodule, 'notify'):
registry[key] = dummy_notify_factory(submodule.notify)
return registry
def get_package_libraries(pkg):
"""
Recursively yield template tag libraries defined in submodules of a
package.
"""
for entry in walk_packages(pkg.__path__, pkg.__name__ + '.'):
try:
module = import_module(entry[1])
except ImportError as e:
raise InvalidTemplateLibrary(
"Invalid template library specified. ImportError raised when "
"trying to load '%s': %s" % (entry[1], e)
)
if hasattr(module, 'register'):
yield entry[1]
def get_package_libraries(pkg):
"""
Recursively yield template tag libraries defined in submodules of a
package.
"""
for entry in walk_packages(pkg.__path__, pkg.__name__ + '.'):
try:
module = import_module(entry[1])
except ImportError as e:
raise InvalidTemplateLibrary(
"Invalid template library specified. ImportError raised when "
"trying to load '%s': %s" % (entry[1], e)
)
if hasattr(module, 'register'):
yield entry[1]
def get_routes(package):
routes = []
for _, modname, ispkg in pkgutil.walk_packages(
path=package.__path__,
prefix=package.__name__ + '.',
onerror=lambda x: None):
if not ispkg:
module = import_module(modname)
for k, cls in vars(module).items():
if k.startswith("_") or not isinstance(cls, six.class_types):
continue
if issubclass(cls, BaseAPI):
if getattr(cls, "route", False):
routes.append(cls)
return routes
# monkeypatch to force application json on raised exceptions
def get_spiders_classes(entry_point_name=None):
discovered = dict()
if not entry_point_name:
entry_point_name = __name__.split(".")[0]
def load_module(module_name):
module = inspect.importlib.import_module(module_name)
for importer, modname, ispkg in pkgutil.walk_packages(path=module.__path__):
mod = inspect.importlib.import_module(module.__name__ + "." + modname)
if ispkg:
load_module(mod.__name__)
else:
for name, obj in inspect.getmembers(mod):
if is_spider(obj) and mod.__name__ == obj.__module__:
try:
sn = getattr(obj, "name", None)
if sn in discovered:
raise Exception("Duplicate spider '{}': {} and {}".format(sn, obj, discovered[sn]))
discovered[sn] = obj
except AttributeError:
pass
load_module(module_name=entry_point_name)
return discovered
def get_reports():
"""
Compile a list of all reports available across all modules in the reports path. Returns a list of tuples:
[
(module_name, (report, report, report, ...)),
(module_name, (report, report, report, ...)),
...
]
"""
module_list = []
# Iterate through all modules within the reports path. These are the user-created files in which reports are
# defined.
for importer, module_name, is_pkg in pkgutil.walk_packages([settings.REPORTS_ROOT]):
module = importlib.import_module('reports.{}'.format(module_name))
report_list = [cls() for _, cls in inspect.getmembers(module, is_report)]
module_list.append((module_name, report_list))
return module_list
def list_modules_in_package_path(package_path, prefix):
modules = []
path_to_walk = [(package_path, prefix)]
while len(path_to_walk) > 0:
curr_path, curr_prefix = path_to_walk.pop()
for importer, name, ispkg in pkgutil.walk_packages(
path=[curr_path]
):
if ispkg:
path_to_walk.append(
(
'%s/%s/' % (curr_path, name),
'%s.%s' % (curr_prefix, name)
)
)
else:
modules.append((name, '%s.%s' % (curr_prefix, name)))
return modules