python类is_uuid_like()的实例源码

cinder_api.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def search_volume(self, volume):
        if uuidutils.is_uuid_like(volume):
            volume = self.cinder.volumes.get(volume)
        else:
            try:
                volume = self.cinder.volumes.find(name=volume)
            except cinder_exception.NotFound:
                raise exception.VolumeNotFound(volume=volume)
            except cinder_exception.NoUniqueMatch:
                raise exception.Conflict(_(
                    'Multiple cinder volumes exist with same name. '
                    'Please use the uuid instead.'))

        return volume
types.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def validate(cls, value):
        if not uuidutils.is_uuid_like(value):
            raise exception.InvalidUUID(uuid=value)
        return value
containers.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 49 收藏 0 点赞 0 评论 0
def _check_security_group(self, context, security_group, container):
        if security_group.get("uuid"):
            security_group_id = security_group.get("uuid")
            if not uuidutils.is_uuid_like(security_group_id):
                raise exception.InvalidUUID(uuid=security_group_id)
            if security_group_id in container.security_groups:
                msg = _("security_group %s already present in container") % \
                    security_group_id
                raise exception.InvalidValue(msg)
        else:
            security_group_ids = utils.get_security_group_ids(
                context, [security_group['name']])
            if len(security_group_ids) > len(security_group):
                msg = _("Multiple security group matches "
                        "found for name %(name)s, use an ID "
                        "to be more specific. ") % security_group
                raise exception.Conflict(msg)
            else:
                security_group_id = security_group_ids[0]
        container_ports_detail = utils.list_ports(context, container)

        for container_port_detail in container_ports_detail:
            if security_group_id in container_port_detail['security_groups']:
                msg = _("security_group %s already present in container") % \
                    list(security_group.values())[0]
                raise exception.InvalidValue(msg)
        return security_group_id
utils.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_resource(resource, resource_ident):
    """Get the resource from the uuid or logical name.

    :param resource: the resource type.
    :param resource_ident: the UUID or logical name of the resource.

    :returns: The resource.
    """
    resource = getattr(objects, resource)

    if uuidutils.is_uuid_like(resource_ident):
        return resource.get_by_uuid(pecan.request.context, resource_ident)

    return resource.get_by_name(pecan.request.context, resource_ident)
nova.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def check_active(self, server):
        """Check server status.

        Accepts both server IDs and server objects.
        Returns True if server is ACTIVE,
        raises errors when server has an ERROR or unknown to Zun status,
        returns False otherwise.

        """
        # not checking with is_uuid_like as most tests use strings e.g. '1234'
        if isinstance(server, six.string_types):
            server = self.fetch_server(server)
            if server is None:
                return False
            else:
                status = self.get_status(server)
        else:
            status = self.get_status(server)
            if status != 'ACTIVE':
                self.refresh_server(server)
                status = self.get_status(server)

        if status in self.deferred_server_statuses:
            return False
        elif status == 'ACTIVE':
            return True
        elif status == 'ERROR':
            fault = getattr(server, 'fault', {})
            raise exception.ServerInError(
                resource_status=status,
                status_reason=_("Message: %(message)s, Code: %(code)s") % {
                    'message': fault.get('message', _('Unknown')),
                    'code': fault.get('code', _('Unknown'))
                })
        else:
            raise exception.ServerUnknownStatus(
                resource_status=server.status,
                status_reason=_('Unknown'),
                result=_('Server is not active'))
nova.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_server_id(self, server, raise_on_error=True):
        if uuidutils.is_uuid_like(server):
            return server
        elif isinstance(server, six.string_types):
            servers = self.client().servers.list(search_opts={'name': server})
            if len(servers) == 1:
                return servers[0].id

            if raise_on_error:
                raise exception.ZunException(_(
                    "Unable to get server id with name %s") % server)
        else:
            raise exception.ZunException(_("Unexpected server type"))
baremetal_standalone_manager.py 文件源码 项目:ironic-tempest-plugin 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def resource_setup(cls):
        super(BaremetalStandaloneScenarioTest, cls).resource_setup()
        base.set_baremetal_api_microversion(cls.api_microversion)
        for v in cls.mandatory_attr:
            if getattr(cls, v) is None:
                raise lib_exc.InvalidConfiguration(
                    "Mandatory attribute %s not set." % v)
        image_checksum = None
        if not uuidutils.is_uuid_like(cls.image_ref):
            image_checksum = cls.image_checksum
        cls.node = cls.boot_node(cls.driver, cls.image_ref,
                                 image_checksum=image_checksum)
        cls.node_ip = cls.add_floatingip_to_node(cls.node['uuid'])
test_uuidutils.py 文件源码 项目:deb-oslo.utils 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_is_uuid_like(self):
        self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4())))
        self.assertTrue(uuidutils.is_uuid_like(
            '{12345678-1234-5678-1234-567812345678}'))
        self.assertTrue(uuidutils.is_uuid_like(
            '12345678123456781234567812345678'))
        self.assertTrue(uuidutils.is_uuid_like(
            'urn:uuid:12345678-1234-5678-1234-567812345678'))
        self.assertTrue(uuidutils.is_uuid_like(
            'urn:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb'))
        self.assertTrue(uuidutils.is_uuid_like(
            'uuid:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb'))
        self.assertTrue(uuidutils.is_uuid_like(
            '{}---bbb---aaa--aaa--aaa-----aaa---aaa--bbb-bbb---bbb-bbb-bb-{}'))
test_uuidutils.py 文件源码 项目:deb-oslo.utils 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_is_uuid_like_insensitive(self):
        self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4()).upper()))
test_uuidutils.py 文件源码 项目:deb-oslo.utils 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_id_is_uuid_like(self):
        self.assertFalse(uuidutils.is_uuid_like(1234567))
test_uuidutils.py 文件源码 项目:deb-oslo.utils 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_name_is_uuid_like(self):
        self.assertFalse(uuidutils.is_uuid_like('zhongyueluo'))
mocks.py 文件源码 项目:vmware-nsxlib 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _is_uuid_uri(self, uri):
        return uuidutils.is_uuid_like(
            urlparse.urlparse(uri).path.split('/')[-1])
types.py 文件源码 项目:qinling 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def validate(value):
        if not uuidutils.is_uuid_like(value):
            raise exc.InputException(
                "Expected a uuid but received %s." % value
            )

        return value
bnp_switch.py 文件源码 项目:networking-hpe 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _get_access_param(self, context, protocol, creds):
        if const.PROTOCOL_SNMP in protocol:
            if not uuidutils.is_uuid_like(creds):
                access_parameters = db.get_snmp_cred_by_name_and_protocol(
                    context, creds, protocol)
            else:
                access_parameters = db.get_snmp_cred_by_id(context, creds)
        else:
            if not uuidutils.is_uuid_like(creds):
                access_parameters = db.get_netconf_cred_by_name_and_protocol(
                    context, creds, protocol)
            else:
                access_parameters = db.get_netconf_cred_by_id(context, creds)
        if not access_parameters:
            raise webob.exc.HTTPBadRequest(
                _("Credentials not found for Id or name: %s") % creds)
        if isinstance(access_parameters, list) and len(access_parameters) > 1:
            raise webob.exc.HTTPBadRequest(
                _("Multiple credentials matches found "
                  "for name: %s, use an ID to be more specific.") % creds)
        if isinstance(access_parameters, list):
            access_parameters = access_parameters[0]
        if access_parameters['protocol_type'] != protocol:
            raise webob.exc.HTTPBadRequest(
                _("Credentials not found for Id or name: %s") % creds)
        return access_parameters
mechanism_hpe.py 文件源码 项目:networking-hpe 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _get_credentials_dict(self, bnp_switch, func_name):
        if not bnp_switch:
            self._raise_ml2_error(wexc.HTTPNotFound, func_name)
        db_context = neutron_context.get_admin_context()
        creds_dict = {}
        creds_dict['ip_address'] = bnp_switch.ip_address
        prov_creds = bnp_switch.credentials
        prov_protocol = bnp_switch.management_protocol
        if hp_const.PROTOCOL_SNMP in prov_protocol:
            if not uuidutils.is_uuid_like(prov_creds):
                snmp_cred = db.get_snmp_cred_by_name(db_context, prov_creds)
                snmp_cred = snmp_cred[0]
            else:
                snmp_cred = db.get_snmp_cred_by_id(db_context, prov_creds)
            if not snmp_cred:
                LOG.error(_LE("Credentials does not match"))
                self._raise_ml2_error(wexc.HTTPNotFound, '')
            creds_dict['write_community'] = snmp_cred.write_community
            creds_dict['security_name'] = snmp_cred.security_name
            creds_dict['security_level'] = snmp_cred.security_level
            creds_dict['auth_protocol'] = snmp_cred.auth_protocol
            creds_dict['management_protocol'] = prov_protocol
            creds_dict['auth_key'] = snmp_cred.auth_key
            creds_dict['priv_protocol'] = snmp_cred.priv_protocol
            creds_dict['priv_key'] = snmp_cred.priv_key
        else:
            if not uuidutils.is_uuid_like(prov_creds):
                netconf_cred = db.get_netconf_cred_by_name(db_context,
                                                           prov_creds)
            else:
                netconf_cred = db.get_netconf_cred_by_id(db_context,
                                                         prov_creds)
            if not netconf_cred:
                LOG.error(_LE("Credentials does not match"))
                self._raise_ml2_error(wexc.HTTPNotFound, '')
            creds_dict['user_name'] = netconf_cred.write_community
            creds_dict['password'] = netconf_cred.security_name
            creds_dict['key_path'] = netconf_cred.security_level
        return creds_dict
validators.py 文件源码 项目:networking-hpe 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def validate_access_parameters(body):
    """Validate if the request body is in proper format."""
    protocol_dict = deepcopy(body)
    if const.NAME not in protocol_dict.keys():
        raise webob.exc.HTTPBadRequest(
            _("Name not found in request body"))
    if uuidutils.is_uuid_like(protocol_dict['name']):
        raise webob.exc.HTTPBadRequest(
            _("Name=%s should not be in uuid format") %
            protocol_dict['name'])
    protocol_dict.pop('name')
    keys = list(protocol_dict.keys())
    if not len(keys):
        raise webob.exc.HTTPBadRequest(
            _("Request body should have at least one protocol specified"))
    elif len(keys) > 1:
        raise webob.exc.HTTPBadRequest(
            _("multiple protocols in a single request is not supported"))
    key = keys[0]
    if key.lower() not in const.SUPPORTED_PROTOCOLS:
        raise webob.exc.HTTPBadRequest(
            _("'protocol %s' is not supported") % keys)
    if key.lower() == const.SNMP_V3:
        return validate_snmpv3_parameters(protocol_dict, key)
    elif key.lower() in [const.NETCONF_SSH, const.NETCONF_SOAP]:
        return validate_netconf_parameters(protocol_dict, key)
    else:
        return validate_snmp_parameters(protocol_dict, key)
validators.py 文件源码 项目:networking-hpe 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def validate_access_parameters_for_update(body):
    """Validate if the request body is in proper format."""

    protocol_dict = deepcopy(body)
    if (const.NAME not in protocol_dict.keys() and
       not len(protocol_dict.keys())):
        raise webob.exc.HTTPBadRequest(
            _("Request must have name or one protocol type"))
    if const.NAME in protocol_dict.keys():
        if uuidutils.is_uuid_like(protocol_dict['name']):
            raise webob.exc.HTTPBadRequest(
                _("Name=%s should not be in uuid format") %
                protocol_dict['name'])
        protocol_dict.pop('name')
    if protocol_dict:
        keys = list(protocol_dict.keys())
        if len(keys) > 1:
            raise webob.exc.HTTPBadRequest(
                _("Multiple protocols in a single request is not supported"))
        key = keys[0]
        if key.lower() not in const.SUPPORTED_PROTOCOLS:
            raise webob.exc.HTTPBadRequest(
                _("Protocol %s is not supported") % keys)
        if key.lower() == const.SNMP_V3:
            return validate_snmpv3_parameters_for_update(protocol_dict, key)
        elif key.lower() in [const.NETCONF_SSH, const.NETCONF_SOAP]:
            return validate_netconf_parameters_for_update(protocol_dict, key)
        else:
            return validate_snmp_parameters_for_update(protocol_dict, key)
    else:
        return None
validator.py 文件源码 项目:bilean 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _validate_uuid_format(uid):
    return uuidutils.is_uuid_like(uid)
scheduled_operations.py 文件源码 项目:python-karborclient 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def take_action(self, parsed_args):
        client = self.app.client_manager.data_protection
        if not uuidutils.is_uuid_like(parsed_args.trigger_id):
            raise exceptions.CommandError(
                "Invalid trigger id provided.")
        so = client.scheduled_operations.create(
            parsed_args.name, parsed_args.operation_type,
            parsed_args.trigger_id, parsed_args.operation_definition)

        format_scheduledoperation(so._info)
        return zip(*sorted(six.iteritems(so._info)))
restores.py 文件源码 项目:python-karborclient 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def take_action(self, parsed_args):
        client = self.app.client_manager.data_protection
        if not uuidutils.is_uuid_like(parsed_args.provider_id):
            raise exceptions.CommandError(
                "Invalid provider id provided.")
        if not uuidutils.is_uuid_like(parsed_args.checkpoint_id):
            raise exceptions.CommandError(
                "Invalid checkpoint id provided.")

        restore_parameters = utils.extract_parameters(parsed_args)
        restore_auth = None
        if parsed_args.restore_target is not None:
            if parsed_args.restore_username is None:
                raise exceptions.CommandError(
                    "Must specify username for restore_target.")
            if parsed_args.restore_password is None:
                raise exceptions.CommandError(
                    "Must specify password for restore_target.")
            restore_auth = {
                'type': 'password',
                'username': parsed_args.restore_username,
                'password': parsed_args.restore_password,
            }
        restore = client.restores.create(parsed_args.provider_id,
                                         parsed_args.checkpoint_id,
                                         parsed_args.restore_target,
                                         restore_parameters, restore_auth)
        format_restore(restore._info)
        return zip(*sorted(restore._info.items()))


问题


面经


文章

微信
公众号

扫码关注公众号