def get_driver_instance():
"""Instantiate a driver instance accordingly to the file configuration.
:returns: a Driver instance
:raises: exceptions.KuryrException
"""
module, name, classname = _parse_port_driver_config()
# TODO(apuimedo): switch to the openstack/stevedore plugin system
try:
driver = importutils.import_object("{0}.{1}".format(module, classname))
except ImportError as ie:
raise exceptions.KuryrException(
"Cannot load port driver '{0}': {1}".format(module, ie))
_verify_port_driver_compliancy(driver, name)
_verify_binding_driver_compatibility(driver, name)
return driver
python类import_object()的实例源码
def fence(self, nodes=None):
"""
Try to shutdown nodes and wait for configurable amount of times
:return: list of nodes and either they are shutdown or failed
"""
# update the list of nodes if required!
if nodes:
self.nodes = nodes
driver_name = self.fencer_conf['driver']
driver = importutils.import_object(
driver_name,
self.nodes,
self.fencer_conf
)
LOG.debug('Loaded fencing driver {0} with config: '
'{1}'.format(driver.get_info(), self.fencer_conf))
return driver.fence()
def load_engine_driver(engine_driver):
"""Load a engine driver module.
Load the engine driver module specified by the engine_driver
configuration option or, if supplied, the driver name supplied as an
argument.
:param engine_driver: a engine driver name to override the config opt
:returns: a EngineDriver server
"""
if not engine_driver:
LOG.error("Engine driver option required, but not specified")
sys.exit(1)
LOG.info("Loading engine driver '%s'", engine_driver)
try:
driver = importutils.import_object(
'mogan.baremetal.%s' % engine_driver)
return utils.check_isinstance(driver, BaseEngineDriver)
except ImportError:
LOG.exception("Unable to load the baremetal driver")
sys.exit(1)
def __init__(self):
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver)
super(H3CL3RouterPlugin, self).__init__()
self.vds_id = None
self.client = rest_client.RestClient()
self.enable_metadata = cfg.CONF.VCFCONTROLLER.enable_metadata
self.router_binding_public_vrf = \
cfg.CONF.VCFCONTROLLER.router_binding_public_vrf
self.disable_internal_l3flow_offload = \
cfg.CONF.VCFCONTROLLER.disable_internal_l3flow_offload
self.enable_l3_router_rpc_notify = \
cfg.CONF.VCFCONTROLLER.enable_l3_router_rpc_notify
self.vendor_rpc_topic = cfg.CONF.VCFCONTROLLER.vendor_rpc_topic
self.setup_rpc()
self.enable_l3_vxlan = cfg.CONF.VCFCONTROLLER.enable_l3_vxlan
self.h3c_l3_vxlan = h3c_l3_vxlan_db.H3CL3VxlanDriver()
if self.enable_l3_vxlan is True:
self.h3c_l3_vxlan.initialize()
def delete(self, fixed_range=None, uuid=None):
"""Deletes a network."""
if fixed_range is None and uuid is None:
raise Exception(_("Please specify either fixed_range or uuid"))
net_manager = importutils.import_object(CONF.network_manager)
if "NeutronManager" in CONF.network_manager:
if uuid is None:
raise Exception(_("UUID is required to delete "
"Neutron Networks"))
if fixed_range:
raise Exception(_("Deleting by fixed_range is not supported "
"with the NeutronManager"))
# delete the network
net_manager.delete_network(context.get_admin_context(),
fixed_range, uuid)
def __init__(self):
try:
self.host_manager = driver.DriverManager(
"nova.scheduler.host_manager",
CONF.scheduler_host_manager,
invoke_on_load=True).driver
# TODO(Yingxin): Change to catch stevedore.exceptions.NoMatches
# after stevedore v1.9.0
except RuntimeError:
# NOTE(Yingxin): Loading full class path is deprecated and
# should be removed in the N release.
try:
self.host_manager = importutils.import_object(
CONF.scheduler_host_manager)
LOG.warning(_LW("DEPRECATED: scheduler_host_manager uses "
"classloader to load %(path)s. This legacy "
"loading style will be removed in the "
"N release."),
{'path': CONF.scheduler_host_manager})
except (ImportError, ValueError):
raise RuntimeError(
_("Cannot load host manager from configuration "
"scheduler_host_manager = %(conf)s."),
{'conf': CONF.scheduler_host_manager})
self.servicegroup_api = servicegroup.API()
def __init__(self, network_driver=None, *args, **kwargs):
self.driver = driver.load_network_driver(network_driver)
self.instance_dns_manager = importutils.import_object(
CONF.instance_dns_manager)
self.instance_dns_domain = CONF.instance_dns_domain
self.floating_dns_manager = importutils.import_object(
CONF.floating_ip_dns_manager)
self.network_api = network_api.API()
self.network_rpcapi = network_rpcapi.NetworkAPI()
self.security_group_api = (
openstack_driver.get_openstack_security_group_driver())
self.servicegroup_api = servicegroup.API()
l3_lib = kwargs.get("l3_lib", CONF.l3_lib)
self.l3driver = importutils.import_object(l3_lib)
self.quotas_cls = objects.Quotas
super(NetworkManager, self).__init__(service_name='network',
*args, **kwargs)
def __init__(self, session, virtapi):
self.compute_api = compute.API()
self._session = session
self._virtapi = virtapi
self._volumeops = volumeops.VolumeOps(self._session)
self.firewall_driver = firewall.load_driver(
DEFAULT_FIREWALL_DRIVER,
xenapi_session=self._session)
vif_impl = importutils.import_class(CONF.xenserver.vif_driver)
self.vif_driver = vif_impl(xenapi_session=self._session)
self.default_root_dev = '/dev/sda'
LOG.debug("Importing image upload handler: %s",
CONF.xenserver.image_upload_handler)
self.image_upload_handler = importutils.import_object(
CONF.xenserver.image_upload_handler)
test_manager.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def setUp(self):
super(LdapDNSTestCase, self).setUp()
self.useFixture(fixtures.MonkeyPatch(
'nova.network.ldapdns.ldap',
fake_ldap))
dns_class = 'nova.network.ldapdns.LdapDNS'
self.driver = importutils.import_object(dns_class)
attrs = {'objectClass': ['domainrelatedobject', 'dnsdomain',
'domain', 'dcobject', 'top'],
'associateddomain': ['root'],
'dc': ['root']}
self.driver.lobj.add_s("ou=hosts,dc=example,dc=org", attrs.items())
self.driver.create_domain(domain1)
self.driver.create_domain(domain2)
test_xenapi.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def setUp(self):
super(XenAPIDom0IptablesFirewallTestCase, self).setUp()
self.flags(connection_url='test_url',
connection_password='test_pass',
group='xenserver')
self.flags(instance_name_template='%d',
firewall_driver='nova.virt.xenapi.firewall.'
'Dom0IptablesFirewallDriver')
self.user_id = 'mappin'
self.project_id = 'fake'
stubs.stubout_session(self.stubs, stubs.FakeSessionForFirewallTests,
test_case=self)
self.context = context.RequestContext(self.user_id, self.project_id)
self.network = importutils.import_object(CONF.network_manager)
self.conn = xenapi_conn.XenAPIDriver(fake.FakeVirtAPI(), False)
self.fw = self.conn._vmops.firewall_driver
def __init__(self, *args, **kwargs):
'''Create an instance of the servicegroup API.
args and kwargs are passed down to the servicegroup driver when it gets
created.
'''
# Make sure report interval is less than service down time
report_interval = CONF.report_interval
if CONF.service_down_time <= report_interval:
new_service_down_time = int(report_interval * 2.5)
LOG.warning(_LW("Report interval must be less than service down "
"time. Current config: <service_down_time: "
"%(service_down_time)s, report_interval: "
"%(report_interval)s>. Setting service_down_time "
"to: %(new_service_down_time)s"),
{'service_down_time': CONF.service_down_time,
'report_interval': report_interval,
'new_service_down_time': new_service_down_time})
CONF.set_override('service_down_time', new_service_down_time)
driver_class = _driver_name_class_mapping[CONF.servicegroup_driver]
self._driver = importutils.import_object(driver_class,
*args, **kwargs)
resource_tracker.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def __init__(self, host, driver, nodename):
self.host = host
self.driver = driver
self.pci_tracker = None
self.nodename = nodename
self.compute_node = None
self.stats = importutils.import_object(CONF.compute_stats_class)
self.tracked_instances = {}
self.tracked_migrations = {}
monitor_handler = monitors.MonitorHandler(self)
self.monitors = monitor_handler.monitors
self.ext_resources_handler = \
ext_resources.ResourceHandler(CONF.compute_resources)
self.old_resources = objects.ComputeNode()
self.scheduler_client = scheduler_client.SchedulerClient()
self.ram_allocation_ratio = CONF.ram_allocation_ratio
self.cpu_allocation_ratio = CONF.cpu_allocation_ratio
self.disk_allocation_ratio = CONF.disk_allocation_ratio
def __init__(self, *args, **kwargs):
super(IOArbLVMISERDriver, self).__init__(*args, **kwargs)
LOG.warning(_LW('IOArbLVMISERDriver is deprecated, you should '
'now just use IOArbLVMVolumeDriver and specify '
'target_helper for the target driver you '
'wish to use. In order to enable iser, please '
'set iscsi_protocol with the value iser.'))
LOG.debug('Attempting to initialize LVM driver with the '
'following target_driver: '
'cinder.volume.targets.iser.ISERTgtAdm')
self.target_driver = importutils.import_object(
'cinder.volume.targets.iser.ISERTgtAdm',
configuration=self.configuration,
db=self.db,
executor=self._execute)
def __init__(self, *args, **kwargs):
super(LVMISERDriver, self).__init__(*args, **kwargs)
LOG.warning(_LW('LVMISERDriver is deprecated, you should '
'now just use LVMVolumeDriver and specify '
'target_helper for the target driver you '
'wish to use. In order to enable iser, please '
'set iscsi_protocol with the value iser.'))
LOG.debug('Attempting to initialize LVM driver with the '
'following target_driver: '
'cinder.volume.targets.iser.ISERTgtAdm')
self.target_driver = importutils.import_object(
'cinder.volume.targets.iser.ISERTgtAdm',
configuration=self.configuration,
db=self.db,
executor=self._execute)
def __init__(self):
LOG.info(_LI("Init huawei l3 driver."))
self.setup_rpc()
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver)
self.start_periodic_l3_agent_status_check()
super(HuaweiACL3RouterPlugin, self).__init__()
if 'dvr' in self.supported_extension_aliases:
l3_dvrscheduler_db.subscribe()
l3_db.subscribe()
LOG.info(_LI("Initialization finished successfully"
" for huawei l3 driver."))
def __init__(self, quota_driver_class=None):
"""Initialize a Quota object."""
if not quota_driver_class:
quota_driver_class = CONF.quota.quota_driver
if isinstance(quota_driver_class, six.string_types):
quota_driver_class = importutils.import_object(quota_driver_class)
self._resources = {}
self._driver = quota_driver_class
def __init__(self, learning_driver=None, service_name=None,
*args, **kwargs):
"""Load the driver from args, or from flags."""
self.configuration = meteos.engine.configuration.Configuration(
engine_manager_opts,
config_group=service_name)
super(LearningManager, self).__init__(*args, **kwargs)
if not learning_driver:
learning_driver = self.configuration.learning_driver
self.driver = importutils.import_object(
learning_driver, configuration=self.configuration,
)
def load_container_driver(container_driver=None):
"""Load a container driver module.
Load the container driver module specified by the container_driver
configuration option or, if supplied, the driver name supplied as an
argument.
:param container_driver: a container driver name to override the config opt
:returns: a ContainerDriver instance
"""
if not container_driver:
container_driver = CONF.container_driver
if not container_driver:
LOG.error("Container driver option required, "
"but not specified")
sys.exit(1)
LOG.info("Loading container driver '%s'", container_driver)
try:
if not container_driver.startswith('zun.'):
container_driver = 'zun.container.%s' % container_driver
driver = importutils.import_object(container_driver)
if not isinstance(driver, ContainerDriver):
raise Exception(_('Expected driver of type: %s') %
str(ContainerDriver))
return driver
except ImportError:
LOG.exception("Unable to load the container driver")
sys.exit(1)
def test_import_object_optional_arg_not_present(self):
obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver')
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_optional_arg_present(self):
obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver')
def test_import_object_required_arg_not_present(self):
# arg 1 isn't optional here
self.assertRaises(TypeError, importutils.import_object,
'oslo_utils.tests.fake.FakeDriver2')
def test_import_object_required_arg_present(self):
obj = importutils.import_object('oslo_utils.tests.fake.FakeDriver2',
first_arg=False)
self.assertEqual(obj.__class__.__name__, 'FakeDriver2')
# namespace tests
def test_import_object(self):
dt = importutils.import_object('datetime.time')
self.assertTrue(isinstance(dt, sys.modules['datetime'].time))
def __init__(self):
super(Manila, self).__init__()
self.manilaclient = utils.get_manilaclient()
conn_conf = manila_conf.volume_connector
if not conn_conf or conn_conf not in volume_connector_conf:
msg = _("Must provide a valid volume connector")
LOG.error(msg)
raise exceptions.InvalidInput(msg)
self.connector = importutils.import_object(
volume_connector_conf[conn_conf],
manilaclient=self.manilaclient)
def __init__(self, notifier):
monitor = CONF.get('monitoring')
backend_name = monitor['backend_name']
self.driver = importutils.import_object(
monitor.driver,
backend_name=backend_name,
notifier=notifier
)
driver_info = self.driver.get_info()
LOG.info('Initializing driver %s with version %s found in %s' %
(driver_info['name'], driver_info['version'],
monitor.get('driver')))
def __init__(self):
notifer_conf = CONF.get('notifiers')
self.driver = importutils.import_object(
notifer_conf.get('driver'),
notifer_conf.get('endpoint'),
notifer_conf.get('username'),
notifer_conf.get('password'),
notifer_conf.get('templates-dir'),
notifer_conf.get('notify-from'),
notifer_conf.get('notify-list'),
**notifer_conf.get('options')
)
def evacuate(self, nodes):
fencer = FencerManager(nodes)
evacuation_conf = CONF.get('evacuation')
driver = importutils.import_object(
evacuation_conf['driver'],
nodes,
evacuation_conf,
fencer
)
return driver.evacuate(self.enable_fencing)
def __init__(self):
self.db = wan_qos_db.WanTcDb()
rpc_callback = importutils.import_object(
'wan_qos.services.plugin.PluginRpcCallback', self)
endpoints = (
[rpc_callback, agents_db.AgentExtRpcCallback()])
self.agent_rpc = api.TcAgentApi(cfg.CONF.host)
self.conn = n_rpc.create_connection()
self.conn.create_consumer(topics.TC_PLUGIN,
endpoints,
fanout=False)
self.conn.consume_in_threads()
test_bgp_dragent_scheduler.py 文件源码
项目:neutron-dynamic-routing
作者: openstack
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def setUp(self):
super(TestBgpAgentFilter, self).setUp()
self.bgp_drscheduler = importutils.import_object(
'neutron_dynamic_routing.services.bgp.scheduler.'
'bgp_dragent_scheduler.ChanceScheduler'
)
self.plugin = self
def __init__(self):
super(BgpPlugin, self).__init__()
self.bgp_drscheduler = importutils.import_object(
cfg.CONF.bgp_drscheduler_driver)
self._setup_rpc()
self._register_callbacks()
self.add_periodic_dragent_status_check()