def add_identity_filter(query, value):
"""Adds an identity filter to a query.
Filters results by ID, if supplied value is a valid integer.
Otherwise attempts to filter results by UUID.
:param query: Initial query to add filter to.
:param value: Value for filtering results by.
:return: Modified query.
"""
if strutils.is_int_like(value):
return query.filter_by(id=value)
elif uuidutils.is_uuid_like(value):
return query.filter_by(uuid=value)
else:
raise exception.InvalidIdentity(identity=value)
python类is_uuid_like()的实例源码
def add_identity_filter(query, value):
"""Adds an identity filter to a query.
Filters results by ID, if supplied value is a valid integer.
Otherwise attempts to filter results by UUID.
:param query: Initial query to add filter to.
:param value: Value for filtering results by.
:return: Modified query.
"""
if strutils.is_int_like(value):
return query.filter_by(id=value)
elif uuidutils.is_uuid_like(value):
return query.filter_by(uuid=value)
else:
raise exception.InvalidIdentity(identity=value)
def destroy_board(self, board_id):
session = get_session()
with session.begin():
query = model_query(models.Board, session=session)
query = add_identity_filter(query, board_id)
try:
board_ref = query.one()
except NoResultFound:
raise exception.BoardNotFound(board=board_id)
# Get board ID, if an UUID was supplied. The ID is
# required for deleting all ports, attached to the board.
if uuidutils.is_uuid_like(board_id):
board_id = board_ref['id']
location_query = model_query(models.Location, session=session)
location_query = self._add_location_filter_by_board(
location_query, board_id)
location_query.delete()
query.delete()
def destroy_plugin(self, plugin_id):
session = get_session()
with session.begin():
query = model_query(models.Plugin, session=session)
query = add_identity_filter(query, plugin_id)
try:
plugin_ref = query.one()
except NoResultFound:
raise exception.PluginNotFound(plugin=plugin_id)
# Get plugin ID, if an UUID was supplied. The ID is
# required for deleting all ports, attached to the plugin.
if uuidutils.is_uuid_like(plugin_id):
plugin_id = plugin_ref['id']
query.delete()
def get_rpc_plugin(plugin_ident):
"""Get the RPC plugin from the plugin uuid or logical name.
:param plugin_ident: the UUID or logical name of a plugin.
:returns: The RPC Plugin.
:raises: InvalidUuidOrName if the name or uuid provided is not valid.
:raises: PluginNotFound if the plugin is not found.
"""
# Check to see if the plugin_ident is a valid UUID. If it is, treat it
# as a UUID.
if uuidutils.is_uuid_like(plugin_ident):
return objects.Plugin.get_by_uuid(pecan.request.context, plugin_ident)
# We can refer to plugins by their name, if the client supports it
# if allow_plugin_logical_names():
# if utils.is_hostname_safe(plugin_ident):
else:
return objects.Plugin.get_by_name(pecan.request.context, plugin_ident)
raise exception.InvalidUuidOrName(name=plugin_ident)
raise exception.PluginNotFound(plugin=plugin_ident)
def add_identity_filter(query, value):
"""Adds an identity filter to a query.
Filters results by ID, if supplied value is a valid integer.
Otherwise attempts to filter results by UUID.
:param query: Initial query to add filter to.
:param value: Value for filtering results by.
:return: Modified query.
"""
if strutils.is_int_like(value):
return query.filter_by(id=value)
elif uuidutils.is_uuid_like(value):
return query.filter_by(uuid=value)
else:
raise exception.InvalidIdentity(identity=value)
def do_verification_create(cs, args):
"""Creates a verification."""
if not uuidutils.is_uuid_like(args.provider_id):
raise exceptions.CommandError(
"Invalid provider id provided.")
if not uuidutils.is_uuid_like(args.checkpoint_id):
raise exceptions.CommandError(
"Invalid checkpoint id provided.")
verification_parameters = arg_utils.extract_parameters(args)
verification = cs.verifications.create(args.provider_id,
args.checkpoint_id,
verification_parameters)
dict_format_list = {"parameters"}
utils.print_dict(verification.to_dict(), dict_format_list=dict_format_list)
def add_identity_filter(query, value):
"""Adds an identity filter to a query.
Filters results by ID, if supplied value is a valid integer.
Otherwise attempts to filter results by UUID.
:param query: Initial query to add filter to.
:param value: Value for filtering results by.
:return: Modified query.
"""
if strutils.is_int_like(value):
return query.filter_by(id=value)
elif uuidutils.is_uuid_like(value):
return query.filter_by(uuid=value)
else:
raise exception.InvalidParameterValue(identity=value)
def get_uuid_by_name(manager, name, segment=None):
"""Helper methods for getting uuid of segment or host by name.
:param manager: A client manager class
:param name: The resource we are trying to find a uuid
:param segment: segment id, default None
:return: The uuid of found resource
"""
# If it cannot be found return the name.
uuid = name
if not uuidutils.is_uuid_like(name):
if segment:
items = manager.hosts(segment)
else:
items = manager.segments()
for item in items:
item_name = getattr(item, 'name')
if item_name == name:
uuid = getattr(item, 'uuid')
break
return uuid
def _get_floating_ip_pool_id_by_name_or_id(self, client, name_or_id):
search_opts = {constants.NET_EXTERNAL: True, 'fields': 'id'}
if uuidutils.is_uuid_like(name_or_id):
search_opts.update({'id': name_or_id})
else:
search_opts.update({'name': name_or_id})
data = client.list_networks(**search_opts)
nets = data['networks']
if len(nets) == 1:
return nets[0]['id']
elif len(nets) == 0:
raise exception.FloatingIpPoolNotFound()
else:
msg = (_("Multiple floating IP pools matches found for name '%s'")
% name_or_id)
raise exception.NovaException(message=msg)
def fixed_ip_get_by_instance(context, instance_uuid):
if not uuidutils.is_uuid_like(instance_uuid):
raise exception.InvalidUUID(uuid=instance_uuid)
vif_and = and_(models.VirtualInterface.id ==
models.FixedIp.virtual_interface_id,
models.VirtualInterface.deleted == 0)
result = model_query(context, models.FixedIp, read_deleted="no").\
filter_by(instance_uuid=instance_uuid).\
outerjoin(models.VirtualInterface, vif_and).\
options(contains_eager("virtual_interface")).\
options(joinedload('network')).\
options(joinedload('floating_ips')).\
order_by(asc(models.VirtualInterface.created_at),
asc(models.VirtualInterface.id)).\
all()
if not result:
raise exception.FixedIpNotFoundForInstance(instance_uuid=instance_uuid)
return result
def _get_requested_instance_group(context, filter_properties,
check_quota):
if (not filter_properties or
not filter_properties.get('scheduler_hints')):
return
group_hint = filter_properties.get('scheduler_hints').get('group')
if not group_hint:
return
# TODO(gibi): We need to remove the following validation code when
# removing legacy v2 code.
if not uuidutils.is_uuid_like(group_hint):
msg = _('Server group scheduler hint must be a UUID.')
raise exception.InvalidInput(reason=msg)
return objects.InstanceGroup.get_by_uuid(context, group_hint)
def create_project(self, name, variables=None):
url = self.url + '/v1/projects'
payload = {'name': name}
if variables:
payload['variables'] = variables
response = self.post(url, headers=self.root_headers, data=payload)
self.assertEqual(201, response.status_code)
self.assertIn('Location', response.headers)
project = response.json()
self.assertTrue(uuidutils.is_uuid_like(project['id']))
self.assertEqual(
response.headers['Location'],
"{}/{}".format(url, project['id'])
)
return project
def process_request(self, request):
headers = request.headers
project_id = headers.get('X-Auth-Project')
if not uuidutils.is_uuid_like(project_id):
raise exceptions.AuthenticationError(
message="Project ID ('{}') is not a valid UUID".format(
project_id
)
)
ctx = self.make_context(
request,
auth_token=headers.get('X-Auth-Token', None),
user=headers.get('X-Auth-User', None),
tenant=project_id,
)
# NOTE(sulo): this means every api call hits the db
# at least once for auth. Better way to handle this?
try:
user_info = dbapi.get_user_info(ctx,
headers.get('X-Auth-User', None))
if user_info.api_key != headers.get('X-Auth-Token', None):
raise exceptions.AuthenticationError
if user_info.is_root:
ctx.is_admin = True
ctx.is_admin_project = True
elif user_info.is_admin:
ctx.is_admin = True
ctx.is_admin_project = False
else:
ctx.is_admin = False
ctx.is_admin_project = False
except exceptions.NotFound:
raise exceptions.AuthenticationError
def validate(value):
if not uuidutils.is_uuid_like(value):
raise exception.InvalidUUID(uuid=value)
return value
def validate(value):
if not (uuidutils.is_uuid_like(value)
or v1_utils.is_valid_logical_name(value)):
raise exception.InvalidUuidOrName(name=value)
return value
def validate(value):
if not uuidutils.is_uuid_like(value):
raise exception.InvalidUUID(uuid=value)
return value
def is_valid_board_name(name):
"""Determine if the provided name is a valid board name.
Check to see that the provided board name is valid, and isn't a UUID.
:param: name: the board name to check.
:returns: True if the name is valid, False otherwise.
"""
return utils.is_hostname_safe(name) and (not uuidutils.is_uuid_like(name))
def is_valid_name(name):
"""Determine if the provided name is a valid name.
Check to see that the provided board name isn't a UUID.
:param: name: the board name to check.
:returns: True if the name is valid, False otherwise.
"""
return not uuidutils.is_uuid_like(name)
def get(cls, context, plugin_id):
"""Find a plugin based on its id or uuid and return a Board object.
:param plugin_id: the id *or* uuid of a plugin.
:returns: a :class:`Board` object.
"""
if strutils.is_int_like(plugin_id):
return cls.get_by_id(context, plugin_id)
elif uuidutils.is_uuid_like(plugin_id):
return cls.get_by_uuid(context, plugin_id)
else:
raise exception.InvalidIdentity(identity=plugin_id)
def get(cls, context, session_or_board_uuid):
"""Find a session based on its id or uuid and return a SessionWP object.
:param session_id: the id *or* uuid of a session.
:returns: a :class:`SessionWP` object.
"""
if strutils.is_int_like(session_or_board_uuid):
return cls.get_by_id(context, session_or_board_uuid)
elif uuidutils.is_uuid_like(session_or_board_uuid):
return cls.get_by_uuid(context, session_or_board_uuid)
else:
raise exception.InvalidIdentity(identity=session_or_board_uuid)
def test_request_id(self):
response = self.app.get('/v2.1/')
self.assertIn('x-openstack-request-id', response.headers)
self.assertTrue(
response.headers['x-openstack-request-id'].startswith('req-'))
id_part = response.headers['x-openstack-request-id'].split('req-')[1]
self.assertTrue(uuidutils.is_uuid_like(id_part))
def test_request_id(self):
response = self.app.get('/')
self.assertIn('x-openstack-request-id', response.headers)
self.assertTrue(
response.headers['x-openstack-request-id'].startswith('req-'))
id_part = response.headers['x-openstack-request-id'].split('req-')[1]
self.assertTrue(uuidutils.is_uuid_like(id_part))
def test_request_id(self):
response = self.app.get('/')
self.assertIn('x-openstack-request-id', response.headers)
self.assertTrue(
response.headers['x-openstack-request-id'].startswith('req-'))
id_part = response.headers['x-openstack-request-id'].split('req-')[1]
self.assertTrue(uuidutils.is_uuid_like(id_part))
def param2id(object_id):
"""Helper function to convert various id types to internal id.
args: [object_id], e.g. 'vol-0000000a' or 'volume-0000000a' or '10'
"""
if uuidutils.is_uuid_like(object_id):
return object_id
elif '-' in object_id:
# FIXME(ja): mapping occurs in nova?
pass
else:
return int(object_id)
def get_neutron_network(self, network):
if uuidutils.is_uuid_like(network):
networks = self.neutron.list_networks(id=network)['networks']
else:
networks = self.neutron.list_networks(name=network)['networks']
if len(networks) == 0:
raise exception.NetworkNotFound(network=network)
elif len(networks) > 1:
raise exception.Conflict(_(
'Multiple neutron networks exist with same name. '
'Please use the uuid instead.'))
network = networks[0]
return network
def get_neutron_port(self, port):
if uuidutils.is_uuid_like(port):
ports = self.list_ports(id=port)['ports']
else:
ports = self.list_ports(name=port)['ports']
if len(ports) == 0:
raise exception.PortNotFound(port=port)
elif len(ports) > 1:
raise exception.Conflict(_(
'Multiple neutron ports exist with same name. '
'Please use the uuid instead.'))
port = ports[0]
return port
def get_resource_provider(self, context, provider_ident):
if uuidutils.is_uuid_like(provider_ident):
return self._get_resource_provider_by_uuid(context, provider_ident)
else:
return self._get_resource_provider_by_name(context, provider_ident)
def get_resource_class(self, context, resource_ident):
if uuidutils.is_uuid_like(resource_ident):
return self._get_resource_class_by_uuid(context, resource_ident)
else:
return self._get_resource_class_by_name(context, resource_ident)
def get_resource_class(self, context, ident):
if uuidutils.is_uuid_like(ident):
return self._get_resource_class_by_uuid(context, ident)
else:
return self._get_resource_class_by_name(context, ident)