def __init__(self):
if not importutils.try_import('iboot'):
raise ironic_exception.DriverLoadError(
driver=self.__class__.__name__,
reason=_("Unable to import iboot library"))
self.boot = fake.FakeBoot()
self.power = iboot_power.IBootPower()
self.deploy = fake.FakeDeploy()
python类try_import()的实例源码
def __init__(self):
LOG.warning("This driver is deprecated and will be removed "
"in the Rocky release. "
"Use 'staging-iboot' hardware type instead.")
if not importutils.try_import('iboot'):
raise ironic_exception.DriverLoadError(
driver=self.__class__.__name__,
reason=_("Unable to import iboot library"))
self.power = iboot_power.IBootPower()
self.boot = pxe.PXEBoot()
self.deploy = iscsi_deploy.ISCSIDeploy()
def __init__(self):
LOG.warning("This driver is deprecated and will be removed "
"in the Rocky release. "
"Use 'staging-iboot' hardware type instead.")
if not importutils.try_import('iboot'):
raise ironic_exception.DriverLoadError(
driver=self.__class__.__name__,
reason=_("Unable to import iboot library"))
self.power = iboot_power.IBootPower()
self.boot = pxe.PXEBoot()
self.deploy = agent.AgentDeploy()
def __init__(self):
LOG.warning("This driver is deprecated and will be removed "
"in the Rocky release. "
"Use 'staging-amt' hardware type instead.")
if not importutils.try_import('pywsman'):
raise ironic_exception.DriverLoadError(
driver=self.__class__.__name__,
reason=_("Unable to import pywsman library"))
self.power = amt_power.AMTPower()
self.boot = pxe.PXEBoot()
self.deploy = amt_deploy.AMTISCSIDeploy()
self.management = amt_management.AMTManagement()
def __init__(self):
LOG.warning("This driver is deprecated and will be removed "
"in the Rocky release. "
"Use 'staging-amt' hardware type instead.")
if not importutils.try_import('pywsman'):
raise ironic_exception.DriverLoadError(
driver=self.__class__.__name__,
reason=_("Unable to import pywsman library"))
self.power = amt_power.AMTPower()
self.boot = pxe.PXEBoot()
self.deploy = agent.AgentDeploy()
self.management = amt_management.AMTManagement()
def __init__(self, manager_module, manager_class, topic, host=None):
super(RPCService, self).__init__()
self.topic = topic
self.host = host or CONF.host
manager_module = importutils.try_import(manager_module)
manager_class = getattr(manager_module, manager_class)
self.manager = manager_class(self.topic, self.host)
self.rpcserver = None
def __init__(self, host, manager_module, manager_class):
super(RPCService, self).__init__()
self.host = host
manager_module = importutils.try_import(manager_module)
manager_class = getattr(manager_module, manager_class)
self.manager = manager_class(host, manager_module.MANAGER_TOPIC)
self.topic = self.manager.topic
self.rpcserver = None
self.deregister = True
def test_try_import(self):
dt = importutils.try_import('datetime')
self.assertEqual(sys.modules['datetime'], dt)
def test_try_import_returns_default(self):
foo = importutils.try_import('foo.bar')
self.assertIsNone(foo)
def _get_connection(self):
if self._cached_conn is None:
libvirt_module = importutils.try_import('libvirt')
if not libvirt_module:
raise error.OSFError('libvirt-python is required '
'to use LibvirtDriver')
self._cached_conn = libvirt_module.open(self.connection_uri)
return self._cached_conn
def __init__(self, manager_module, manager_class, topic, host=None):
super(RPCService, self).__init__()
self.host = host or CONF.host
manager_module = importutils.try_import(manager_module)
manager_class = getattr(manager_module, manager_class)
self.manager = manager_class(host, topic)
self.topic = topic
self.rpcserver = None