def get_action(driver, keyword):
"""get action class corresponding to the keyword in the driver
"""
drvmod = 'ProductDrivers.' + driver
drvmodobj = importlib.import_module(drvmod)
drvfile_methods = inspect.getmembers(drvmodobj, inspect.isroutine)
main_method = [item[1] for item in drvfile_methods if item[0] == 'main'][0]
main_src = inspect.getsource(main_method)
pkglstmatch = re.search(r'package_list.*=.*\[(.*)\]', main_src, re.MULTILINE | re.DOTALL)
pkglst = pkglstmatch.group(1).split(',')
for pkg in pkglst:
pkgobj = importlib.import_module(pkg)
pkgdir = os.path.dirname(pkgobj.__file__)
action_modules = [pkg+'.'+name for _, name, _ in pkgutil.iter_modules([pkgdir])]
action_module_objs = [importlib.import_module(action_module) for action_module in action_modules]
for action_module_obj in action_module_objs:
for action_class in inspect.getmembers(action_module_obj, inspect.isclass):
for func_name in inspect.getmembers(action_class[1], inspect.isroutine):
if keyword == func_name[0]:
return action_class[1]
return None
python类iter_modules()的实例源码
def _install_modules(command_table):
for cmd in command_table:
command_table[cmd].load_arguments()
try:
mods_ns_pkg = import_module('azure.cli.command_modules')
installed_command_modules = [modname for _, modname, _ in
pkgutil.iter_modules(mods_ns_pkg.__path__)
if modname not in BLACKLISTED_MODS]
except ImportError:
pass
for mod in installed_command_modules:
try:
mod = import_module('azure.cli.command_modules.' + mod)
mod.load_params(mod)
mod.load_commands()
except Exception: # pylint: disable=broad-except
print("Error loading: {}".format(mod), file=stderr)
traceback.print_exc(file=stderr)
_update_command_definitions(command_table)
def loadplugins(plugindir=None, namespace="plugins"):
""" Load all of the modules from the plugins directory and instantiate the
classes within. """
if plugindir == None:
plugindir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "plugins")
analytics = []
for _, x, _ in pkgutil.iter_modules([plugindir]):
m1 = __import__('supremm.' + namespace + '.' + x)
m = getattr(m1, namespace)
m = getattr(m, x)
c = getattr(m, x)
if issubclass(c, Plugin) or issubclass(c, PreProcessor):
analytics.append(c)
else:
del m1
return analytics
def submodules(root_module):
mod_path = root_module.__file__
fn = os.path.basename(mod_path)
pathname = os.path.dirname(mod_path)
if fn not in ("__init__.py", "__init__.pyc"):
return None
rst = {}
for imp, name, _ in pkgutil.iter_modules([pathname]):
loader = imp.find_module(name)
mod = loader.load_module(root_module.__name__ + "." + name)
rst[name] = mod
return rst
def _iter_all_modules(package, prefix=''):
"""
Iterates over the names of all modules that can be found in the given
package, recursively.
Example:
_iter_all_modules(_pytest) ->
['_pytest.assertion.newinterpret',
'_pytest.capture',
'_pytest.core',
...
]
"""
import os
import pkgutil
if type(package) is not str:
path, prefix = package.__path__[0], package.__name__ + '.'
else:
path = package
for _, name, is_package in pkgutil.iter_modules([path]):
if is_package:
for m in _iter_all_modules(os.path.join(path, name), prefix=name + '.'):
yield prefix + m
else:
yield prefix + name
def _sub_modules_dict(self):
"""
Lists modules in the directory of this module (if this module is a
package).
"""
path = self._module.path
names = {}
if path is not None and path.endswith(os.path.sep + '__init__.py'):
mods = pkgutil.iter_modules([os.path.dirname(path)])
for module_loader, name, is_pkg in mods:
fake_n = helpers.FakeName(name)
# It's obviously a relative import to the current module.
imp = helpers.FakeImport(fake_n, self, level=1)
fake_n.parent = imp
names[name] = [fake_n]
# TODO add something like this in the future, its cleaner than the
# import hacks.
# ``os.path`` is a hardcoded exception, because it's a
# ``sys.modules`` modification.
#if str(self.name) == 'os':
# names.append(helpers.FakeName('path', parent=self))
return names
def _get_module_names(self, search_path=None):
"""
Get the names of all modules in the search_path. This means file names
and not names defined in the files.
"""
names = []
# add builtin module names
if search_path is None:
names += [self._generate_name(name) for name in sys.builtin_module_names]
if search_path is None:
search_path = self.sys_path_with_modifications()
for module_loader, name, is_pkg in pkgutil.iter_modules(search_path):
names.append(self._generate_name(name))
return names
def _sub_modules_dict(self):
"""
Lists modules in the directory of this module (if this module is a
package).
"""
path = self._module.path
names = {}
if path is not None and path.endswith(os.path.sep + '__init__.py'):
mods = pkgutil.iter_modules([os.path.dirname(path)])
for module_loader, name, is_pkg in mods:
fake_n = helpers.FakeName(name)
# It's obviously a relative import to the current module.
imp = helpers.FakeImport(fake_n, self, level=1)
fake_n.parent = imp
names[name] = [fake_n]
# TODO add something like this in the future, its cleaner than the
# import hacks.
# ``os.path`` is a hardcoded exception, because it's a
# ``sys.modules`` modification.
#if str(self.name) == 'os':
# names.append(helpers.FakeName('path', parent=self))
return names
def _get_module_names(self, search_path=None):
"""
Get the names of all modules in the search_path. This means file names
and not names defined in the files.
"""
names = []
# add builtin module names
if search_path is None:
names += [self._generate_name(name) for name in sys.builtin_module_names]
if search_path is None:
search_path = self.sys_path_with_modifications()
for module_loader, name, is_pkg in pkgutil.iter_modules(search_path):
names.append(self._generate_name(name))
return names
def install_modules():
for cmd in CMD_TABLE:
CMD_TABLE[cmd].load_arguments()
try:
mods_ns_pkg = import_module('azure.cli.command_modules')
installed_command_modules = [modname for _, modname, _ in
pkgutil.iter_modules(mods_ns_pkg.__path__)
if modname not in BLACKLISTED_MODS]
except ImportError:
pass
for mod in installed_command_modules:
try:
mod = import_module('azure.cli.command_modules.' + mod)
mod.load_params(mod)
mod.load_commands()
except Exception: # pylint: disable=broad-except
print("Error loading: {}".format(mod))
_update_command_definitions(CMD_TABLE)
def get_modules():
"""
Returns:
return a list consist of module name.
Raises:
SwarmModuleException: An error occurred when try to get modules or no available module.
"""
try:
s=os.path.dirname(modules.__file__)
ret=[name for _, name, _ in pkgutil.iter_modules([s])]
except Exception as e:
raise SwarmModuleException('an error occurred when try to get modules, please check'
' modules of swarm')
# check available module
if len(ret)==0:
raise SwarmModuleException('no available module')
return ret
def find_components(package, directory, base_class):
components = OrderedDict()
for module_loader, module_name, ispkg in pkgutil.iter_modules([directory]):
full_module_name = "%s.%s" % (package, module_name)
if full_module_name not in sys.modules and not ispkg:
module = importlib.import_module(full_module_name)
for member_name, obj in inspect.getmembers(module):
if inspect.isclass(
obj) and base_class in obj.__bases__:
# TODO test if the obj implements the interface
# Keep in mind that this only instantiates the ensemble_wrapper,
# but not the real target classifier
classifier = obj
components[module_name] = classifier
return components
def run_excalibot():
import json, pkgutil
from . import Excalibot, db, log, config
log.debug('Creating bot')
bot = Excalibot(config.bot_config)
log.debug('Loading extensions')
for _, cog, _ in pkgutil.iter_modules(['excalibot/cogs']):
if cog == 'utils': continue
log.debug('Loading extension %s', cog)
bot.load_extension('excalibot.cogs.' + cog)
db.DatabaseObject.metadata.create_all()
bot.run()
bot.loop.close()
log.shutdown()
def update_plugins():
"""Looks for nodes in the plugin paths."""
# This has to be imported here in order to avoid circular imports.
from urban_journey.ujml.node_base import NodeBase
# Iterate through all modules in the plugin paths.
for loader, module_name, is_pkg in pkgutil.iter_modules(plugin_paths):
module = loader.find_module(module_name).load_module(module_name)
# Looping through all members of each module to find the module classes.
for member_name, member in inspect.getmembers(module):
# Ignore all private members
if member_name.startswith('__'):
continue
# Add the member to the node register if it's a node.
if isinstance(member, type):
if issubclass(member, NodeBase):
node_register[member_name] = member
def getPackageModules(self, package):
packageModules = []
for importer, moduleName, isPackage in pkgutil.iter_modules(package.__path__):
fullModuleName = "{0}.{1}".format(package.__name__, moduleName)
if isPackage:
subpackageObject = importlib.import_module(fullModuleName, package=package.__path__)
subpackageObjectDirectory = dir(subpackageObject)
if "Plugin" in subpackageObjectDirectory:
packageModules.append((subpackageObject, moduleName))
continue
subPackageModules = self.getPackageModules(subpackageObject)
packageModules = packageModules + subPackageModules
else:
packageModules.append(fullModuleName)
return packageModules
def _sub_modules_dict(self):
"""
Lists modules in the directory of this module (if this module is a
package).
"""
path = self._path
names = {}
if path is not None and path.endswith(os.path.sep + '__init__.py'):
mods = pkgutil.iter_modules([os.path.dirname(path)])
for module_loader, name, is_pkg in mods:
# It's obviously a relative import to the current module.
names[name] = imports.SubModuleName(self, name)
# TODO add something like this in the future, its cleaner than the
# import hacks.
# ``os.path`` is a hardcoded exception, because it's a
# ``sys.modules`` modification.
# if str(self.name) == 'os':
# names.append(Name('path', parent_context=self))
return names
def _get_module_names(self, search_path=None, in_module=None):
"""
Get the names of all modules in the search_path. This means file names
and not names defined in the files.
"""
names = []
# add builtin module names
if search_path is None and in_module is None:
names += [self._generate_name(name) for name in sys.builtin_module_names]
if search_path is None:
search_path = self.sys_path_with_modifications()
for module_loader, name, is_pkg in pkgutil.iter_modules(search_path):
names.append(self._generate_name(name, in_module=in_module))
return names
def find_mods():
""" Find and import modules from the module directories """
global mod_lib
mod_lib = []
print('~ Looking for modules in:', settings.MOD_DIRS)
for finder, name, _ in pkgutil.iter_modules(settings.MOD_DIRS):
try:
mod = finder.find_module(name).load_module(name)
for member in dir(mod):
obj = getattr(mod, member)
if inspect.isclass(obj):
for parent in obj.__bases__:
if 'Module' is parent.__name__:
mod_lib.append(obj())
except Exception as e:
print(traceback.format_exc())
print('\n~ Error loading \''+name+'\' '+str(e))
mod_lib.sort(key=lambda mod: mod.priority, reverse=True)
def find_apis():
""" Find APIs """
global api_lib
api_lib = {}
print('~ Looking for APIs in:', settings.API_DIRS)
for finder, name, _ in pkgutil.iter_modules(settings.API_DIRS):
try:
file = finder.find_module(name).load_module(name)
for member in dir(file):
obj = getattr(file, member)
if inspect.isclass(obj):
for parent in obj.__bases__:
if 'Api' is parent.__name__:
api = obj()
api_lib[api.key] = api
except Exception as e:
print(traceback.format_exc())
print('\n~ Error loading \''+name+'\' '+str(e))
def install_aria_extensions(strict=True):
"""
Loads all Python packages with names beginning with ``aria_extension_`` and calls their
``aria_extension`` initialization entry points if they have them.
:param strict: if ``True`` tries to load extensions while taking into account the versions
of their dependencies, otherwise ignores versions
:type strict: bool
"""
for loader, module_name, _ in iter_modules():
if module_name.startswith('aria_extension_'):
loader.find_module(module_name).load_module(module_name)
for entry_point in pkg_resources.iter_entry_points(group='aria_extension'):
# It should be possible to enable non strict loading - use the package that is already
# installed inside the environment, and forgo the version demand
if strict:
entry_point.load()
else:
entry_point.resolve()
extension.init()
def gen_parser(caller, commands, parser, subparsers):
"""
Run setup() for all submodules of sub_commands
:param caller: Module calling this module
:type caller: string
:param commands: Sub-module relative to caller with commands
:type commands: string
:param parser: Argparse object
:type parser: argparse.ArgumentParser
:param subparsers: Subparsers object
:type subparsers: argparse._SubParsersAction
"""
package = importlib.import_module('{}.{}'.format(caller, commands))
for importer, modname, ispkg in \
pkgutil.iter_modules(package.__path__,
prefix='{}.{}.'.format(caller, commands)):
if not ispkg:
found_module = importlib.import_module('{}'.format(modname))
found_module.setup(parser, subparsers)
# vim:et:fdm=marker:sts=4:sw=4:ts=4
def silence_ibapi_logging(levels=["DEBUG", "INFO"]):
"""
Silences the excessive ibapi logging to the root logger.
"""
levels = levels or ["DEBUG", "INFO"]
for level in levels:
if level not in ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"):
raise ValueError("unknown log level: {0}".format(level))
for _, module_name, _ in pkgutil.iter_modules(ibapi.__path__):
module = __import__("ibapi.{0}".format(module_name), fromlist="ibapi")
if not hasattr(module, "logging"):
continue
for level in levels:
setattr(module.logging, level.lower(), noop)
def __get_module(self, name):
mod_info = {}
for module_loader, module_name, is_pkg in pkgutil.iter_modules(modules.__path__):
if module_name == name:
module = module_loader.find_module(module_name).load_module(module_name)
if hasattr(module, '__class_name__'):
if hasattr(module, '__client_mode__'):
if module.__client_mode__:
class_name = module.__class_name__
class_ = getattr(module, class_name)
functions = self.__get_functions(class_)
mod_info['name'] = module_name
mod_info['class'] = class_name
mod_info['type'] = module.__module_type__
mod_info['functions'] = functions
return mod_info
return False
def get_module_list(self, out_format='raw'):
module_list = []
for module_loader, name, is_pkg in pkgutil.iter_modules(modules.__path__):
module = self.__get_module(name)
if module:
for function_name in module['functions']:
mod_info = {
'module': module['class'],
'type': module['type'],
'function': function_name
}
module_list.append(mod_info)
if out_format == 'raw':
return module_list
elif out_format == 'select':
count = 0
for item in module_list:
item['choice'] = count
count += 1
return module_list
def _iter_all_modules(package, prefix=''):
"""
Iterates over the names of all modules that can be found in the given
package, recursively.
Example:
_iter_all_modules(_pytest) ->
['_pytest.assertion.newinterpret',
'_pytest.capture',
'_pytest.core',
...
]
"""
import os
import pkgutil
if type(package) is not str:
path, prefix = package.__path__[0], package.__name__ + '.'
else:
path = package
for _, name, is_package in pkgutil.iter_modules([path]):
if is_package:
for m in _iter_all_modules(os.path.join(path, name), prefix=name + '.'):
yield prefix + m
else:
yield prefix + name
def discover_handler_classes(handlers_package):
"""
Looks for handler classes within handler path module.
Currently it's not looking deep into nested module.
:param handlers_package: module path to handlers
:type handlers_package: string
:return: list of handler classes
"""
if handlers_package is None:
return
# Add working directory into PYTHONPATH to import developer packages
sys.path.insert(0, os.getcwd())
package = import_module(handlers_package)
# Continue searching for module if package is not a module
if hasattr(package, '__path__'):
for _, modname, _ in pkgutil.iter_modules(package.__path__):
import_module('{package}.{module}'.format(package=package.__name__, module=modname))
return registered_handlers
def _iter_all_modules(package, prefix=''):
"""
Iterates over the names of all modules that can be found in the given
package, recursively.
Example:
_iter_all_modules(_pytest) ->
['_pytest.assertion.newinterpret',
'_pytest.capture',
'_pytest.core',
...
]
"""
import os
import pkgutil
if type(package) is not str:
path, prefix = package.__path__[0], package.__name__ + '.'
else:
path = package
for _, name, is_package in pkgutil.iter_modules([path]):
if is_package:
for m in _iter_all_modules(os.path.join(path, name), prefix=name + '.'):
yield prefix + m
else:
yield prefix + name
def _iter_all_modules(package, prefix=''):
"""
Iterates over the names of all modules that can be found in the given
package, recursively.
Example:
_iter_all_modules(_pytest) ->
['_pytest.assertion.newinterpret',
'_pytest.capture',
'_pytest.core',
...
]
"""
import os
import pkgutil
if type(package) is not str:
path, prefix = package.__path__[0], package.__name__ + '.'
else:
path = package
for _, name, is_package in pkgutil.iter_modules([path]):
if is_package:
for m in _iter_all_modules(os.path.join(path, name), prefix=name + '.'):
yield prefix + m
else:
yield prefix + name
def load_all_modules_from_dir(self, dirname):
log.debug('Loading modules from "%s"' % dirname)
for importer, package_name, _ in pkgutil.iter_modules([dirname]):
self.module_path = "%s/%s" % (dirname, package_name)
log.debug("Importing '%s'" % package_name)
try:
importer.find_module(package_name).load_module(package_name)
except Exception as e:
log.critical('Could not load `%s`. Error follows.' % package_name)
log.critical(e, exc_info=1)
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback_string = StringIO()
traceback.print_exception(exc_type, exc_value, exc_traceback,
file=traceback_string)
self.send(
'Could not load `%s` from %s.' % (package_name, dirname),
DEBUG_CHANNEL)
self.send(traceback_string.getvalue(), DEBUG_CHANNEL)
def test_random_config_synthetic(self):
path = os.path.dirname(hpolib.benchmarks.synthetic_functions.__file__)
for _, pkg, _ in pkgutil.iter_modules([path, ]):
pkg_name = "hpolib.benchmarks.synthetic_functions.{:s}".format(pkg)
importlib.import_module(pkg_name)
mod_name = sys.modules[pkg_name]
for name, obj in inspect.getmembers(mod_name, inspect.isclass):
if issubclass(obj, AbstractBenchmark) and "Abstract" not in name:
b = getattr(mod_name, name)()
cfg = b.get_configuration_space()
for i in range(100):
c = cfg.sample_configuration()
res = b.objective_function(c)
self.assertTrue(np.isfinite(res['function_value']))