def get_config_hash(file_dir, resource_mapping, exts=['conf']):
res = {}
if not os.path.isdir(file_dir):
logger.debug(
"Directory {} not found. Returning emty dict".format(file_dir))
return {}
conf_files = [conf for conf in os.listdir(file_dir)
if conf.split('.')[-1] in exts]
for conf_file in conf_files:
if conf_file in resource_mapping.keys():
drv = resource_mapping[conf_file].get(
'driver',
'fuel_external_git.drivers.openstack_config.OpenStackConfig'
)
drv_class = importutils.import_class(drv)
config = drv_class(
os.path.join(file_dir, conf_file),
resource_mapping[conf_file]['resource']
)
deep_merge(res, config.to_config_dict())
return res
python类import_class()的实例源码
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
service_name=None, *args, **kwargs):
super(Service, self).__init__()
if not rpc.initialized():
rpc.init(CONF)
self.host = host
self.binary = binary
self.topic = topic
self.manager_class_name = manager
manager_class = importutils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host,
service_name=service_name,
*args, **kwargs)
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
def _get_manager(self):
"""Initialize a Manager object appropriate for this service.
Use the service name to look up a Manager subclass from the
configuration and initialize an instance. If no class name
is configured, just return None.
:returns: a Manager instance, or None.
"""
fl = '%s_manager' % self.name
if fl not in CONF:
return None
manager_class_name = CONF.get(fl, None)
if not manager_class_name:
return None
manager_class = importutils.import_class(manager_class_name)
return manager_class()
def get_class(api_name, version, version_map):
"""Returns the client class for the requested API version
:param api_name: the name of the API, e.g. 'compute', 'image', etc
:param version: the requested API version
:param version_map: a dict of client classes keyed by version
:rtype: a client class for the requested API version
"""
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = _("Invalid %(api_name)s client version '%(version)s'. "
"Must be one of: %(version_map)s") % {
'api_name': api_name,
'version': version,
'version_map': ', '.join(version_map.keys())}
raise exceptions.UnsupportedVersion(msg)
return importutils.import_class(client_path)
def get_matching_classes(self, loadable_class_names):
"""Get loadable classes from a list of names.
Each name can be a full module path or the full path to a
method that returns classes to use. The latter behavior
is useful to specify a method that returns a list of
classes to use in a default case.
"""
classes = []
for cls_name in loadable_class_names:
obj = importutils.import_class(cls_name)
if self._is_correct_class(obj):
classes.append(obj)
elif inspect.isfunction(obj):
# Get list of classes from a function
for cls in obj():
classes.append(cls)
else:
error_str = 'Not a class of the correct type'
raise exception.ClassNotFound(class_name=cls_name,
exception=error_str)
return classes
def get_client_class(api_name, version, version_map):
"""Returns the client class for the requested API version
:param api_name: the name of the API, e.g. 'compute', 'image', etc
:param version: the requested API version
:param version_map: a dict of client classes keyed by version
:rtype: a client class for the requested API version
"""
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
sorted_versions = sorted(version_map.keys(),
key=lambda s: list(map(int, s.split('.'))))
msg = _(
"Invalid %(api_name)s client version '%(version)s'. "
"must be one of: %(version_map)s"
)
raise exceptions.UnsupportedVersion(msg % {
'api_name': api_name,
'version': version,
'version_map': ', '.join(sorted_versions),
})
return importutils.import_class(client_path)
def get_class(api_name, version, version_map):
"""Returns the client class for the requested API version
:param api_name: the name of the API, e.g. 'compute', 'image', etc
:param version: the requested API version
:param version_map: a dict of client classes keyed by version
:rtype: a client class for the requested API version
"""
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = _("Invalid %(api_name)s client version '%(version)s'. "
"Must be one of: %(version_map)s") % {
'api_name': api_name,
'version': version,
'version_map': ', '.join(version_map.keys())}
raise exceptions.UnsupportedVersion(msg)
return importutils.import_class(client_path)
def get_class(api_name, version, version_map):
"""Returns the client class for the requested API version
:param api_name: the name of the API, e.g. 'compute', 'image', etc
:param version: the requested API version
:param version_map: a dict of client classes keyed by version
:rtype: a client class for the requested API version
"""
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = _("Invalid %(api_name)s client version '%(version)s'. "
"Must be one of: %(version_map)s") % {
'api_name': api_name,
'version': version,
'version_map': ', '.join(version_map.keys())}
raise exceptions.UnsupportedVersion(msg)
return importutils.import_class(client_path)
def get_class(api_name, version, version_map):
"""Returns the client class for the requested API version
:param api_name: the name of the API, e.g. 'compute', 'image', etc
:param version: the requested API version
:param version_map: a dict of client classes keyed by version
:rtype: a client class for the requested API version
"""
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = _("Invalid %(api_name)s client version '%(version)s'. "
"Must be one of: %(version_map)s") % {
'api_name': api_name,
'version': version,
'version_map': ', '.join(version_map.keys())}
raise exceptions.UnsupportedVersion(msg)
return importutils.import_class(client_path)
def get_class(api_name, version, version_map):
"""Returns the client class for the requested API version
:param api_name: the name of the API, e.g. 'compute', 'image', etc
:param version: the requested API version
:param version_map: a dict of client classes keyed by version
:rtype: a client class for the requested API version
"""
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = _("Invalid %(api_name)s client version '%(version)s'. "
"Must be one of: %(version_map)s") % {
'api_name': api_name,
'version': version,
'version_map': ', '.join(version_map.keys())}
raise exceptions.UnsupportedVersion(msg)
return importutils.import_class(client_path)
def get_client_class(api_name, version, version_map):
"""Returns the client class for the requested API version.
:param api_name: the name of the API, e.g. 'compute', 'image', etc
:param version: the requested API version
:param version_map: a dict of client classes keyed by version
:rtype: a client class for the requested API version
"""
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = _("Invalid %(api_name)s client version '%(version)s'. must be "
"one of: %(map_keys)s")
msg = msg % {'api_name': api_name, 'version': version,
'map_keys': ', '.join(version_map.keys())}
raise exceptions.UnsupportedVersion(msg)
return importutils.import_class(client_path)
def get_class(api_name, version, version_map):
"""Returns the client class for the requested API version
:param api_name: the name of the API, e.g. 'compute', 'image', etc
:param version: the requested API version
:param version_map: a dict of client classes keyed by version
:rtype: a client class for the requested API version
"""
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = _("Invalid %(api_name)s client version '%(version)s'. "
"Must be one of: %(version_map)s") % {
'api_name': api_name,
'version': version,
'version_map': ', '.join(version_map.keys())}
raise exceptions.UnsupportedVersion(msg)
return importutils.import_class(client_path)
def __init__(self, host, binary, topic, manager,
periodic_enable=None, periodic_fuzzy_delay=None,
periodic_interval_max=None):
super(Service, self).__init__()
if not rpc.initialized():
rpc.init(CONF)
self.host = host
self.binary = binary
self.topic = topic
self.manager_class_name = manager
manager_class = importutils.import_class(self.manager_class_name)
self.rpcserver = None
self.manager = manager_class(host=self.host)
self.periodic_enable = periodic_enable
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.periodic_interval_max = periodic_interval_max
def load_extension(self, ext_factory):
"""Execute an extension factory.
Loads an extension. The 'ext_factory' is the name of a
callable that will be imported and called with one
argument--the extension manager. The factory callable is
expected to call the register() method at least once.
"""
LOG.debug("Loading extension %s", ext_factory)
if isinstance(ext_factory, six.string_types):
# Load the factory
factory = importutils.import_class(ext_factory)
else:
factory = ext_factory
# Call it
LOG.debug("Calling extension factory %s", ext_factory)
factory(self)
loadables.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def get_matching_classes(self, loadable_class_names):
"""Get loadable classes from a list of names. Each name can be
a full module path or the full path to a method that returns
classes to use. The latter behavior is useful to specify a method
that returns a list of classes to use in a default case.
"""
classes = []
for cls_name in loadable_class_names:
obj = importutils.import_class(cls_name)
if self._is_correct_class(obj):
classes.append(obj)
elif inspect.isfunction(obj):
# Get list of classes from a function
for cls in obj():
classes.append(cls)
else:
error_str = 'Not a class of the correct type'
raise exception.ClassNotFound(class_name=cls_name,
exception=error_str)
return classes
def __init__(self, session, virtapi):
self.compute_api = compute.API()
self._session = session
self._virtapi = virtapi
self._volumeops = volumeops.VolumeOps(self._session)
self.firewall_driver = firewall.load_driver(
DEFAULT_FIREWALL_DRIVER,
xenapi_session=self._session)
vif_impl = importutils.import_class(CONF.xenserver.vif_driver)
self.vif_driver = vif_impl(xenapi_session=self._session)
self.default_root_dev = '/dev/sda'
LOG.debug("Importing image upload handler: %s",
CONF.xenserver.image_upload_handler)
self.image_upload_handler = importutils.import_object(
CONF.xenserver.image_upload_handler)
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_enable=None, periodic_fuzzy_delay=None,
periodic_interval_max=None, db_allowed=True,
*args, **kwargs):
super(Service, self).__init__()
self.host = host
self.binary = binary
self.topic = topic
self.manager_class_name = manager
self.servicegroup_api = servicegroup.API()
manager_class = importutils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host, *args, **kwargs)
self.rpcserver = None
self.report_interval = report_interval
self.periodic_enable = periodic_enable
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.periodic_interval_max = periodic_interval_max
self.saved_args, self.saved_kwargs = args, kwargs
self.backdoor_port = None
self.conductor_api = conductor.API(use_local=db_allowed)
self.conductor_api.wait_until_ready(context.get_admin_context())
def _get_manager(self):
"""Initialize a Manager object appropriate for this service.
Use the service name to look up a Manager subclass from the
configuration and initialize an instance. If no class name
is configured, just return None.
:returns: a Manager instance, or None.
"""
fl = '%s_manager' % self.name
if fl not in CONF:
return None
manager_class_name = CONF.get(fl, None)
if not manager_class_name:
return None
manager_class = importutils.import_class(manager_class_name)
return manager_class()
def __init__(self, application, limits=None, limiter=None, **kwargs):
"""Initialize new `RateLimitingMiddleware`.
It wraps the given WSGI application and sets up the given limits.
@param application: WSGI application to wrap
@param limits: String describing limits
@param limiter: String identifying class for representing limits
Other parameters are passed to the constructor for the limiter.
"""
base_wsgi.Middleware.__init__(self, application)
# Select the limiter class
if limiter is None:
limiter = Limiter
else:
limiter = importutils.import_class(limiter)
# Parse the limits, if any are provided
if limits is not None:
limits = limiter.parse_limits(limits)
self._limiter = limiter(limits or DEFAULT_LIMITS, **kwargs)
def __init__(self, *args, **kwargs):
LOG.warning(_LW('The cells feature of Nova is considered experimental '
'by the OpenStack project because it receives much '
'less testing than the rest of Nova. This may change '
'in the future, but current deployers should be aware '
'that the use of it in production right now may be '
'risky. Also note that cells does not currently '
'support rolling upgrades, it is assumed that cells '
'deployments are upgraded lockstep so n-1 cells '
'compatibility does not work.'))
# Mostly for tests.
cell_state_manager = kwargs.pop('cell_state_manager', None)
super(CellsManager, self).__init__(service_name='cells',
*args, **kwargs)
if cell_state_manager is None:
cell_state_manager = cells_state.CellStateManager
self.state_manager = cell_state_manager()
self.msg_runner = messaging.MessageRunner(self.state_manager)
cells_driver_cls = importutils.import_class(
CONF.cells.driver)
self.driver = cells_driver_cls()
self.instances_to_heal = iter([])
contrail_driver_base.py 文件源码
项目:networking-opencontrail
作者: openstack
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def _parse_class_args(self):
"""Parse the contrailplugin.ini file.
Opencontrail supports extension such as ipam, policy, these extensions
can be configured in the plugin configuration file as shown below.
Plugin then loads the specified extensions.
contrail_extensions=ipam:<classpath>,policy:<classpath>
"""
contrail_extensions = cfg.CONF.APISERVER.contrail_extensions
# If multiple class specified for same extension, last one will win
# according to DictOpt behavior
for ext_name, ext_class in contrail_extensions.items():
try:
if not ext_class or ext_class == 'None':
self.supported_extension_aliases.append(ext_name)
continue
ext_class = importutils.import_class(ext_class)
ext_instance = ext_class()
ext_instance.set_core(self)
for method in dir(ext_instance):
for prefix in ['get', 'update', 'delete', 'create']:
if method.startswith('%s_' % prefix):
setattr(self, method,
ext_instance.__getattribute__(method))
self.supported_extension_aliases.append(ext_name)
except Exception:
LOG.exception("Contrail Backend Error")
# Converting contrail backend error to Neutron Exception
raise exceptions.InvalidContrailExtensionError(
ext_name=ext_name, ext_class=ext_class)
self._build_auth_details()
def API():
importutils = oslo_utils.importutils
cluster_api_class = oslo_config.cfg.CONF.cluster_api_class
cls = importutils.import_class(cluster_api_class)
return cls()
def __init__(self, host, binary, manager):
super(Service, self).__init__()
self.host = host
self.binary = binary
self.manager_class_name = manager
manager_class = importutils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host)
def __init__(self, virtapi):
super(DockerDriver, self).__init__(virtapi)
self._docker = None
vif_class = importutils.import_class(CONF.docker.vif_driver)
self.vif_driver = vif_class()
self.firewall_driver = firewall.load_driver(
default='nova.virt.firewall.NoopFirewallDriver')
# NOTE(zhangguoqing): For passing the nova unit tests
self.active_migrations = {}
def test_all_public_methods_are_traced(self):
profiler_opts.set_defaults(conf.CONF)
self.config(enabled=True,
group='profiler')
classes = [
'zun.compute.api.API',
'zun.compute.rpcapi.API',
]
for clsname in classes:
# give the metaclass and trace_cls() decorator a chance to patch
# methods of the classes above
six.reload_module(
importutils.import_module(clsname.rsplit('.', 1)[0]))
cls = importutils.import_class(clsname)
for attr, obj in cls.__dict__.items():
# only public methods are traced
if attr.startswith('_'):
continue
# only checks callables
if not (inspect.ismethod(obj) or inspect.isfunction(obj)):
continue
# osprofiler skips static methods
if isinstance(obj, staticmethod):
continue
self.assertTrue(getattr(obj, '__traced__', False), obj)
def setUp(self):
super(TestShell, self).setUp()
self.shell_class = importutils.import_class(self.shell_class_name)
self.cmd_patch = mock.patch(self.shell_class_name + ".run_subcommand")
self.cmd_save = self.cmd_patch.start()
self.addCleanup(self.cmd_patch.stop)
self.app = mock.Mock("Test Shell")
def test_import_class(self):
dt = importutils.import_class('datetime.datetime')
self.assertEqual(sys.modules['datetime'].datetime, dt)
def test_import_bad_class(self):
self.assertRaises(ImportError, importutils.import_class,
'lol.u_mad.brah')
def _get_client_class_and_version(version):
if not isinstance(version, api_versions.APIVersion):
version = api_versions.get_api_version(version)
else:
api_versions.check_major_version(version)
if version.is_latest():
raise exceptions.UnsupportedVersion(("The version should be explicit, "
"not latest."))
return version, importutils.import_class(
"harborclient.v%s.client.Client" % version.ver_major)
def _get_connector(self):
connector = cinder_conf.volume_connector
if not connector or connector not in volume_connector_conf:
msg = _("Must provide an valid volume connector")
LOG.error(msg)
raise exceptions.FuxiException(msg)
return importutils.import_class(volume_connector_conf[connector])()