def search_volume(self, volume):
if uuidutils.is_uuid_like(volume):
volume = self.cinder.volumes.get(volume)
else:
try:
volume = self.cinder.volumes.find(name=volume)
except cinder_exception.NotFound:
raise exception.VolumeNotFound(volume=volume)
except cinder_exception.NoUniqueMatch:
raise exception.Conflict(_(
'Multiple cinder volumes exist with same name. '
'Please use the uuid instead.'))
return volume
python类is_uuid_like()的实例源码
def validate(cls, value):
if not uuidutils.is_uuid_like(value):
raise exception.InvalidUUID(uuid=value)
return value
def _check_security_group(self, context, security_group, container):
if security_group.get("uuid"):
security_group_id = security_group.get("uuid")
if not uuidutils.is_uuid_like(security_group_id):
raise exception.InvalidUUID(uuid=security_group_id)
if security_group_id in container.security_groups:
msg = _("security_group %s already present in container") % \
security_group_id
raise exception.InvalidValue(msg)
else:
security_group_ids = utils.get_security_group_ids(
context, [security_group['name']])
if len(security_group_ids) > len(security_group):
msg = _("Multiple security group matches "
"found for name %(name)s, use an ID "
"to be more specific. ") % security_group
raise exception.Conflict(msg)
else:
security_group_id = security_group_ids[0]
container_ports_detail = utils.list_ports(context, container)
for container_port_detail in container_ports_detail:
if security_group_id in container_port_detail['security_groups']:
msg = _("security_group %s already present in container") % \
list(security_group.values())[0]
raise exception.InvalidValue(msg)
return security_group_id
def get_resource(resource, resource_ident):
"""Get the resource from the uuid or logical name.
:param resource: the resource type.
:param resource_ident: the UUID or logical name of the resource.
:returns: The resource.
"""
resource = getattr(objects, resource)
if uuidutils.is_uuid_like(resource_ident):
return resource.get_by_uuid(pecan.request.context, resource_ident)
return resource.get_by_name(pecan.request.context, resource_ident)
def check_active(self, server):
"""Check server status.
Accepts both server IDs and server objects.
Returns True if server is ACTIVE,
raises errors when server has an ERROR or unknown to Zun status,
returns False otherwise.
"""
# not checking with is_uuid_like as most tests use strings e.g. '1234'
if isinstance(server, six.string_types):
server = self.fetch_server(server)
if server is None:
return False
else:
status = self.get_status(server)
else:
status = self.get_status(server)
if status != 'ACTIVE':
self.refresh_server(server)
status = self.get_status(server)
if status in self.deferred_server_statuses:
return False
elif status == 'ACTIVE':
return True
elif status == 'ERROR':
fault = getattr(server, 'fault', {})
raise exception.ServerInError(
resource_status=status,
status_reason=_("Message: %(message)s, Code: %(code)s") % {
'message': fault.get('message', _('Unknown')),
'code': fault.get('code', _('Unknown'))
})
else:
raise exception.ServerUnknownStatus(
resource_status=server.status,
status_reason=_('Unknown'),
result=_('Server is not active'))
def get_server_id(self, server, raise_on_error=True):
if uuidutils.is_uuid_like(server):
return server
elif isinstance(server, six.string_types):
servers = self.client().servers.list(search_opts={'name': server})
if len(servers) == 1:
return servers[0].id
if raise_on_error:
raise exception.ZunException(_(
"Unable to get server id with name %s") % server)
else:
raise exception.ZunException(_("Unexpected server type"))
baremetal_standalone_manager.py 文件源码
项目:ironic-tempest-plugin
作者: openstack
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def resource_setup(cls):
super(BaremetalStandaloneScenarioTest, cls).resource_setup()
base.set_baremetal_api_microversion(cls.api_microversion)
for v in cls.mandatory_attr:
if getattr(cls, v) is None:
raise lib_exc.InvalidConfiguration(
"Mandatory attribute %s not set." % v)
image_checksum = None
if not uuidutils.is_uuid_like(cls.image_ref):
image_checksum = cls.image_checksum
cls.node = cls.boot_node(cls.driver, cls.image_ref,
image_checksum=image_checksum)
cls.node_ip = cls.add_floatingip_to_node(cls.node['uuid'])
def test_is_uuid_like(self):
self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4())))
self.assertTrue(uuidutils.is_uuid_like(
'{12345678-1234-5678-1234-567812345678}'))
self.assertTrue(uuidutils.is_uuid_like(
'12345678123456781234567812345678'))
self.assertTrue(uuidutils.is_uuid_like(
'urn:uuid:12345678-1234-5678-1234-567812345678'))
self.assertTrue(uuidutils.is_uuid_like(
'urn:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb'))
self.assertTrue(uuidutils.is_uuid_like(
'uuid:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb'))
self.assertTrue(uuidutils.is_uuid_like(
'{}---bbb---aaa--aaa--aaa-----aaa---aaa--bbb-bbb---bbb-bbb-bb-{}'))
def test_is_uuid_like_insensitive(self):
self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4()).upper()))
def test_id_is_uuid_like(self):
self.assertFalse(uuidutils.is_uuid_like(1234567))
def test_name_is_uuid_like(self):
self.assertFalse(uuidutils.is_uuid_like('zhongyueluo'))
def _is_uuid_uri(self, uri):
return uuidutils.is_uuid_like(
urlparse.urlparse(uri).path.split('/')[-1])
def validate(value):
if not uuidutils.is_uuid_like(value):
raise exc.InputException(
"Expected a uuid but received %s." % value
)
return value
def _get_access_param(self, context, protocol, creds):
if const.PROTOCOL_SNMP in protocol:
if not uuidutils.is_uuid_like(creds):
access_parameters = db.get_snmp_cred_by_name_and_protocol(
context, creds, protocol)
else:
access_parameters = db.get_snmp_cred_by_id(context, creds)
else:
if not uuidutils.is_uuid_like(creds):
access_parameters = db.get_netconf_cred_by_name_and_protocol(
context, creds, protocol)
else:
access_parameters = db.get_netconf_cred_by_id(context, creds)
if not access_parameters:
raise webob.exc.HTTPBadRequest(
_("Credentials not found for Id or name: %s") % creds)
if isinstance(access_parameters, list) and len(access_parameters) > 1:
raise webob.exc.HTTPBadRequest(
_("Multiple credentials matches found "
"for name: %s, use an ID to be more specific.") % creds)
if isinstance(access_parameters, list):
access_parameters = access_parameters[0]
if access_parameters['protocol_type'] != protocol:
raise webob.exc.HTTPBadRequest(
_("Credentials not found for Id or name: %s") % creds)
return access_parameters
def _get_credentials_dict(self, bnp_switch, func_name):
if not bnp_switch:
self._raise_ml2_error(wexc.HTTPNotFound, func_name)
db_context = neutron_context.get_admin_context()
creds_dict = {}
creds_dict['ip_address'] = bnp_switch.ip_address
prov_creds = bnp_switch.credentials
prov_protocol = bnp_switch.management_protocol
if hp_const.PROTOCOL_SNMP in prov_protocol:
if not uuidutils.is_uuid_like(prov_creds):
snmp_cred = db.get_snmp_cred_by_name(db_context, prov_creds)
snmp_cred = snmp_cred[0]
else:
snmp_cred = db.get_snmp_cred_by_id(db_context, prov_creds)
if not snmp_cred:
LOG.error(_LE("Credentials does not match"))
self._raise_ml2_error(wexc.HTTPNotFound, '')
creds_dict['write_community'] = snmp_cred.write_community
creds_dict['security_name'] = snmp_cred.security_name
creds_dict['security_level'] = snmp_cred.security_level
creds_dict['auth_protocol'] = snmp_cred.auth_protocol
creds_dict['management_protocol'] = prov_protocol
creds_dict['auth_key'] = snmp_cred.auth_key
creds_dict['priv_protocol'] = snmp_cred.priv_protocol
creds_dict['priv_key'] = snmp_cred.priv_key
else:
if not uuidutils.is_uuid_like(prov_creds):
netconf_cred = db.get_netconf_cred_by_name(db_context,
prov_creds)
else:
netconf_cred = db.get_netconf_cred_by_id(db_context,
prov_creds)
if not netconf_cred:
LOG.error(_LE("Credentials does not match"))
self._raise_ml2_error(wexc.HTTPNotFound, '')
creds_dict['user_name'] = netconf_cred.write_community
creds_dict['password'] = netconf_cred.security_name
creds_dict['key_path'] = netconf_cred.security_level
return creds_dict
def validate_access_parameters(body):
"""Validate if the request body is in proper format."""
protocol_dict = deepcopy(body)
if const.NAME not in protocol_dict.keys():
raise webob.exc.HTTPBadRequest(
_("Name not found in request body"))
if uuidutils.is_uuid_like(protocol_dict['name']):
raise webob.exc.HTTPBadRequest(
_("Name=%s should not be in uuid format") %
protocol_dict['name'])
protocol_dict.pop('name')
keys = list(protocol_dict.keys())
if not len(keys):
raise webob.exc.HTTPBadRequest(
_("Request body should have at least one protocol specified"))
elif len(keys) > 1:
raise webob.exc.HTTPBadRequest(
_("multiple protocols in a single request is not supported"))
key = keys[0]
if key.lower() not in const.SUPPORTED_PROTOCOLS:
raise webob.exc.HTTPBadRequest(
_("'protocol %s' is not supported") % keys)
if key.lower() == const.SNMP_V3:
return validate_snmpv3_parameters(protocol_dict, key)
elif key.lower() in [const.NETCONF_SSH, const.NETCONF_SOAP]:
return validate_netconf_parameters(protocol_dict, key)
else:
return validate_snmp_parameters(protocol_dict, key)
def validate_access_parameters_for_update(body):
"""Validate if the request body is in proper format."""
protocol_dict = deepcopy(body)
if (const.NAME not in protocol_dict.keys() and
not len(protocol_dict.keys())):
raise webob.exc.HTTPBadRequest(
_("Request must have name or one protocol type"))
if const.NAME in protocol_dict.keys():
if uuidutils.is_uuid_like(protocol_dict['name']):
raise webob.exc.HTTPBadRequest(
_("Name=%s should not be in uuid format") %
protocol_dict['name'])
protocol_dict.pop('name')
if protocol_dict:
keys = list(protocol_dict.keys())
if len(keys) > 1:
raise webob.exc.HTTPBadRequest(
_("Multiple protocols in a single request is not supported"))
key = keys[0]
if key.lower() not in const.SUPPORTED_PROTOCOLS:
raise webob.exc.HTTPBadRequest(
_("Protocol %s is not supported") % keys)
if key.lower() == const.SNMP_V3:
return validate_snmpv3_parameters_for_update(protocol_dict, key)
elif key.lower() in [const.NETCONF_SSH, const.NETCONF_SOAP]:
return validate_netconf_parameters_for_update(protocol_dict, key)
else:
return validate_snmp_parameters_for_update(protocol_dict, key)
else:
return None
def _validate_uuid_format(uid):
return uuidutils.is_uuid_like(uid)
def take_action(self, parsed_args):
client = self.app.client_manager.data_protection
if not uuidutils.is_uuid_like(parsed_args.trigger_id):
raise exceptions.CommandError(
"Invalid trigger id provided.")
so = client.scheduled_operations.create(
parsed_args.name, parsed_args.operation_type,
parsed_args.trigger_id, parsed_args.operation_definition)
format_scheduledoperation(so._info)
return zip(*sorted(six.iteritems(so._info)))
def take_action(self, parsed_args):
client = self.app.client_manager.data_protection
if not uuidutils.is_uuid_like(parsed_args.provider_id):
raise exceptions.CommandError(
"Invalid provider id provided.")
if not uuidutils.is_uuid_like(parsed_args.checkpoint_id):
raise exceptions.CommandError(
"Invalid checkpoint id provided.")
restore_parameters = utils.extract_parameters(parsed_args)
restore_auth = None
if parsed_args.restore_target is not None:
if parsed_args.restore_username is None:
raise exceptions.CommandError(
"Must specify username for restore_target.")
if parsed_args.restore_password is None:
raise exceptions.CommandError(
"Must specify password for restore_target.")
restore_auth = {
'type': 'password',
'username': parsed_args.restore_username,
'password': parsed_args.restore_password,
}
restore = client.restores.create(parsed_args.provider_id,
parsed_args.checkpoint_id,
parsed_args.restore_target,
restore_parameters, restore_auth)
format_restore(restore._info)
return zip(*sorted(restore._info.items()))