def driver_dict_from_config(named_driver_config, *args, **kwargs):
driver_registry = dict()
for driver_str in named_driver_config:
driver_type, _sep, driver = driver_str.partition('=')
driver_class = importutils.import_class(driver)
driver_registry[driver_type] = driver_class(*args, **kwargs)
return driver_registry
python类import_class()的实例源码
def _import_factory(self, local_conf):
"""Import an app/filter class.
Lookup the KEY from the PasteDeploy local conf and import the
class named there. This class can then be used as an app or
filter factory.
"""
class_name = local_conf[self.KEY].replace(':', '.').strip()
return importutils.import_class(class_name)
def _get_client_class_and_version(version):
if not isinstance(version, api_versions.APIVersion):
version = api_versions.get_api_version(version)
else:
api_versions.check_major_version(version)
if version.is_latest():
raise exceptions.UnsupportedVersion(
_('The version should be explicit, not latest.'))
return version, importutils.import_class(
'zunclient.v%s.client.Client' % version.ver_major)
def load_config(self):
LOG.info("Loading Ironic settings.")
self._client_cls = importutils.import_class(CONF.IRONIC.ironic_client)
self._client = self._get_client()
self._ipam_strategies = self._parse_ironic_ipam_strategies()
LOG.info("Ironic Driver config loaded. Client: %s"
% (CONF.IRONIC.endpoint_url))
def get_client_class(version):
version_map = {
'1.1': 'dhclient.v2.client.Client',
'2': 'dhclient.v2.client.Client',
'3': 'dhclient.v2.client.Client',
}
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = _("Invalid client version '%(version)s'. must be one of: "
"%(keys)s") % {'version': version,
'keys': ', '.join(version_map.keys())}
raise exceptions.UnsupportedVersion(msg)
return importutils.import_class(client_path)
def get_client_class(api_name, version, version_map):
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = _("Invalid %(api_name)s client version '%(version)s'. must be "
"one of: %(map_keys)s")
msg = msg % {'api_name': api_name, 'version': version,
'map_keys': ', '.join(version_map.keys())}
raise exceptions.UnsupportedVersion(msg)
return importutils.import_class(client_path)
def get_client_class(version):
version_map = {
'1.1': 'icgwclient.v2.client.Client',
'2': 'icgwclient.v2.client.Client',
'3': 'icgwclient.v2.client.Client',
}
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = _("Invalid client version '%(version)s'. must be one of: "
"%(keys)s") % {'version': version,
'keys': ', '.join(version_map.keys())}
raise exceptions.UnsupportedVersion(msg)
return importutils.import_class(client_path)
def get_client_class(api_name, version, version_map):
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = "Invalid %s client version '%s'. must be one of: %s" % (
(api_name, version, ', '.join(version_map.keys())))
raise exceptions.UnsupportedVersion(msg)
return importutils.import_class(client_path)
def main(manager='rock.extension_manager.ExtensionManager'):
utils.register_all_options()
utils.prepare_log(service_name='rock-mon')
log = logging.getLogger(__name__)
log.info('Start rock monitor.')
mgr_class = importutils.import_class(manager)
file_path = os.path.abspath(__file__)
file_dir = os.path.dirname(file_path)
ext_mgr = mgr_class(file_dir + '/extensions')
ext_mgr.start_collect_data()
def data_get_by_obj_time(obj_name, delta):
model_name = 'model_' + obj_name
model = importutils.import_class(
'rock.db.sqlalchemy.%s.%s' %
(model_name, rule_utils.underline_to_camel(model_name)))
timedelta = datetime.timedelta(seconds=delta)
return db_api.get_period_records(model,
timeutils.utcnow()-timedelta,
end_time=timeutils.utcnow(),
sort_key='created_at')
def get_tasks_objects(task_cls_name):
result = []
for i in task_cls_name:
result.append(importutils.import_class(i)())
return result
def main(manager='rock.rules.rule_manager.RuleManager'):
utils.register_all_options()
utils.prepare_log(service_name='rock-engine')
log = logging.getLogger(__name__)
log.info('Start rock engine')
mgr_class = importutils.import_class(manager)
mgr = mgr_class('/etc/rock/cases')
from rock.tasks.check_and_run import check_and_run
check_and_run()
mgr.after_start()
def _get_volume_drivers(self):
driver_registry = dict()
for driver_str in dpm_volume_drivers:
driver_type, _sep, driver = driver_str.partition('=')
driver_class = importutils.import_class(driver)
driver_registry[driver_type] = driver_class(self._host)
return driver_registry
def __init__(self):
self.queryclient = LazyLoader(importutils.import_class(
'mogan.scheduler.client.query.SchedulerQueryClient'))
self.reportclient = LazyLoader(importutils.import_class(
'mogan.scheduler.client.report.SchedulerReportClient'))
def monkey_patch():
"""If the CONF.monkey_patch set as True,
this function patches a decorator
for all functions in specified modules.
You can set decorators for each modules
using CONF.monkey_patch_modules.
The format is "Module path:Decorator function".
name - name of the function
function - object of the function
"""
# If CONF.monkey_patch is not True, this function do nothing.
if not CONF.monkey_patch:
return
if six.PY2:
is_method = inspect.ismethod
else:
def is_method(obj):
# Unbound methods became regular functions on Python 3
return inspect.ismethod(obj) or inspect.isfunction(obj)
# Get list of modules and decorators
for module_and_decorator in CONF.monkey_patch_modules:
module, decorator_name = module_and_decorator.split(':')
# import decorator function
decorator = importutils.import_class(decorator_name)
__import__(module)
# Retrieve module information using pyclbr
module_data = pyclbr.readmodule_ex(module)
for key, value in module_data.items():
# set the decorator for the class methods
if isinstance(value, pyclbr.Class):
clz = importutils.import_class("%s.%s" % (module, key))
for method, func in inspect.getmembers(clz, is_method):
setattr(clz, method,
decorator("%s.%s.%s" % (module, key,
method), func))
# set the decorator for the function
if isinstance(value, pyclbr.Function):
func = importutils.import_class("%s.%s" % (module, key))
setattr(sys.modules[module], key,
decorator("%s.%s" % (module, key), func))
def API():
cls = importutils.import_class("masakari.compute.nova.API")
return cls()
def include_var(name, rawtext, text, lineno, inliner, options=None,
content=None):
"""include variable
:param name: The local name of the interpreted role, the role name
actually used in the document.
:param rawtext: A string containing the enitre interpreted text input,
including the role and markup. Return it as a problematic
node linked to a system message if a problem is
encountered.
:param text: The interpreted text content.
:param lineno: The line number where the interpreted text begins.
:param inliner: The docutils.parsers.rst.states.Inliner object that
called include_var. It contains the several attributes
useful for error reporting and document tree access.
:param options: A dictionary of directive options for customization
(from the 'role' directive), to be interpreted by the
role function. Used for additional attributes for the
generated elements and other functionality.
:param content: A list of strings, the directive content for
customization (from the 'role' directive). To be
interpreted by the role function.
:return:
"""
obj = importutils.import_class(text)
if isinstance(obj, (tuple, list)):
obj = ", ".join(obj)
elif isinstance(obj, dict):
obj = json.dumps(dict, indent=4)
else:
obj = str(obj)
return [nodes.Text(obj)], []
__init__.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def __init__(self):
self.queryclient = LazyLoader(importutils.import_class(
'nova.scheduler.client.query.SchedulerQueryClient'))
self.reportclient = LazyLoader(importutils.import_class(
'nova.scheduler.client.report.SchedulerReportClient'))
__init__.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def API():
cls = importutils.import_class(CONF.keymgr.api_class)
return cls()
__init__.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def API(skip_policy_check=False):
if is_neutron() is None:
network_api_class = oslo_config.cfg.CONF.network_api_class
elif is_neutron():
network_api_class = NEUTRON_NET_API
else:
network_api_class = NOVA_NET_API
cls = importutils.import_class(network_api_class)
return cls(skip_policy_check=skip_policy_check)