python类generate_uuid()的实例源码

test_mysql_kafka_offsets.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_add_another_offset(self):
        topic_1 = uuidutils.generate_uuid()
        partition_1 = random.randint(0, 1024)
        until_offset_1 = random.randint(0, sys.maxsize)
        from_offset_1 = random.randint(0, sys.maxsize)
        app_name_1 = uuidutils.generate_uuid()
        offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
        my_batch_time = self.get_dummy_batch_time()

        used_values = {}
        self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
                                    app_name=app_name_1,
                                    from_offset=from_offset_1,
                                    until_offset=until_offset_1,
                                    batch_time_info=my_batch_time)
        used_values[offset_key_1] = {
            "topic": topic_1, "partition": partition_1, "app_name": app_name_1,
            "from_offset": from_offset_1, "until_offset": until_offset_1
        }

        kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
            app_name_1)
        offset_value_1 = kafka_offset_specs.get(offset_key_1)
        self.assertions_on_offset(used_value=used_values.get(offset_key_1),
                                  offset_value=offset_value_1)
        self.assertEqual(1,
                         len(self.kafka_offset_specs.get_kafka_offsets(
                             app_name_1)))
test_power.py 文件源码 项目:ironic-staging-drivers 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_send_magic_packets(self, mock_socket):
        fake_socket = mock.Mock(spec=socket, spec_set=True)
        mock_socket.return_value = fake_socket()
        obj_utils.create_test_port(self.context,
                                   uuid=uuidutils.generate_uuid(),
                                   address='aa:bb:cc:dd:ee:ff',
                                   node_id=self.node.id)
        with task_manager.acquire(
                self.context, self.node.uuid, shared=True) as task:
            wol_power._send_magic_packets(task, '255.255.255.255', 9)

            expected_calls = [
                mock.call(),
                mock.call().setsockopt(socket.SOL_SOCKET,
                                       socket.SO_BROADCAST, 1),
                mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
                mock.call().sendto(mock.ANY, ('255.255.255.255', 9)),
                mock.call().close()]

            fake_socket.assert_has_calls(expected_calls)
            self.assertEqual(1, mock_socket.call_count)
test_audit_template.py 文件源码 项目:watcher-tempest-plugin 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_create_audit_template(self):
        goal_name = "dummy"
        _, goal = self.client.show_goal(goal_name)

        params = {
            'name': 'my at name %s' % uuidutils.generate_uuid(),
            'description': 'my at description',
            'goal': goal['uuid']}
        expected_data = {
            'name': params['name'],
            'description': params['description'],
            'goal_uuid': params['goal'],
            'goal_name': goal_name,
            'strategy_uuid': None,
            'strategy_name': None}

        _, body = self.create_audit_template(**params)
        self.assert_expected(expected_data, body)

        _, audit_template = self.client.show_audit_template(body['uuid'])
        self.assert_expected(audit_template, body)
test_audit_template.py 文件源码 项目:watcher-tempest-plugin 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_create_audit_template_unicode_description(self):
        goal_name = "dummy"
        _, goal = self.client.show_goal(goal_name)
        # Use a unicode string for testing:
        params = {
            'name': 'my at name %s' % uuidutils.generate_uuid(),
            'description': 'my àt déscrïptïôn',
            'goal': goal['uuid']}

        expected_data = {
            'name': params['name'],
            'description': params['description'],
            'goal_uuid': params['goal'],
            'goal_name': goal_name,
            'strategy_uuid': None,
            'strategy_name': None}

        _, body = self.create_audit_template(**params)
        self.assert_expected(expected_data, body)

        _, audit_template = self.client.show_audit_template(body['uuid'])
        self.assert_expected(audit_template, body)
test_audit_template.py 文件源码 项目:watcher-tempest-plugin 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_update_audit_template_remove(self):
        description = 'my at description'
        name = 'my at name %s' % uuidutils.generate_uuid()
        params = {'name': name,
                  'description': description,
                  'goal': self.goal['uuid']}

        _, audit_template = self.create_audit_template(**params)

        # Removing the description
        self.client.update_audit_template(
            audit_template['uuid'],
            [{'path': '/description', 'op': 'remove'}])

        _, body = self.client.show_audit_template(audit_template['uuid'])
        self.assertIsNone(body.get('description'))

        # Assert nothing else was changed
        self.assertEqual(name, body['name'])
        self.assertIsNone(body['description'])
        self.assertEqual(self.goal['uuid'], body['goal_uuid'])
users.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def post(self, context, request_data):
        """Create a new user. Requires project admin privileges."""
        # NOTE(sulo): Instead of using context project_id from
        # header, here we always ensure, user create gets project_id
        # from request param.
        project_id = request_data["project_id"]
        dbapi.projects_get_by_id(context, project_id)
        api_key = uuidutils.generate_uuid()
        request_data["api_key"] = api_key
        user_obj = dbapi.users_create(context, request_data)

        location = v1.api.url_for(
            UserById, id=user_obj.id, _external=True
        )
        headers = {'Location': location}

        return jsonutils.to_primitive(user_obj), 201, headers
worker.py 文件源码 项目:craton 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def start(conf):
    persistence = _get_persistence_backend(conf)

    if conf.taskflow.db_upgrade:
        with contextlib.closing(persistence.get_connection()) as conn:
            LOG.info('Checking for database schema upgrade')
            conn.upgrade()

    my_name = uuidutils.generate_uuid()
    LOG.info('I am %s', my_name)

    board = _get_jobboard_backend(conf, persistence=persistence)

    conductor = conductors.fetch(
        'nonblocking', my_name, board,
        engine='parallel',
        max_simultaneous_jobs=conf.max_simultaneous_jobs,
        persistence=persistence)

    board.connect()
    LOG.debug('Starting taskflow conductor loop')
    threading.Thread(target=conductor.run).start()

    return persistence, board, conductor
api.py 文件源码 项目:cyborg 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def accelerator_create(self, context, values):
        if not values.get('uuid'):
            values['uuid'] = uuidutils.generate_uuid()

        if not values.get('description'):
            values['description'] = ''

        accelerator = models.Accelerator()
        accelerator.update(values)

        with _session_for_write() as session:
            try:
                session.add(accelerator)
                session.flush()
            except db_exc.DBDuplicateEntry:
                raise exception.AcceleratorAlreadyExists(uuid=values['uuid'])
            return accelerator
api.py 文件源码 项目:iotronic 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def create_board(self, values):
        # ensure defaults are present for new boards
        if 'uuid' not in values:
            values['uuid'] = uuidutils.generate_uuid()
        if 'status' not in values:
            values['status'] = states.REGISTERED

        board = models.Board()
        board.update(values)
        try:
            board.save()
        except db_exc.DBDuplicateEntry as exc:
            if 'code' in exc.columns:
                raise exception.DuplicateCode(code=values['code'])
            raise exception.BoardAlreadyExists(uuid=values['uuid'])
        return board
api.py 文件源码 项目:trio2o 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def register_job(context, _type, resource_id):
    try:
        context.session.begin()
        job_dict = {'id': uuidutils.generate_uuid(),
                    'type': _type,
                    'status': constants.JS_Running,
                    'resource_id': resource_id,
                    'extra_id': constants.SP_EXTRA_ID}
        job = core.create_resource(context, models.Job, job_dict)
        context.session.commit()
        return job
    except db_exc.DBDuplicateEntry:
        context.session.rollback()
        return None
    except db_exc.DBDeadlock:
        context.session.rollback()
        return None
    finally:
        context.session.close()
test_quota.py 文件源码 项目:trio2o 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_update_subproject_not_in_hierarchy(self):

        # Create another project hierarchy
        E = self.FakeProject(id=uuidutils.generate_uuid(), parent_id=None)
        F = self.FakeProject(id=uuidutils.generate_uuid(), parent_id=E.id)
        E.subtree = {F.id: F.subtree}
        self.project_by_id[E.id] = E
        self.project_by_id[F.id] = F

        qso = quota.QuotaSetOperation(self.A.id)
        qso._get_project = mock.Mock()
        qso._get_project.side_effect = self._get_project
        self.ctx.project_id = self.A.id

        updated = _make_body(tenant_id=None, root=True,
                             **self.test_class_quota)
        expected = _make_body(tenant_id=None, root=True,
                              **self.test_class_expected_result)
        result = qso.update(self.ctx, **updated)
        self.assertDictMatch(expected, result)

        # Update the quota of B to be equal to its parent quota
        qso.update_hierarchy(F.id)
        self.assertRaises(exceptions.HTTPForbiddenError, qso.update,
                          self.ctx, **updated)
db.py 文件源码 项目:panko 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setUp(self):
        super(HBaseManager, self).setUp()
        self.connection = storage.get_connection(
            self.url, self.conf)
        # Unique prefix for each test to keep data is distinguished because
        # all test data is stored in one table
        data_prefix = uuidutils.generate_uuid(dashed=False)

        def table(conn, name):
            return mocks.MockHBaseTable(name, conn, data_prefix)

        # Mock only real HBase connection, MConnection "table" method
        # stays origin.
        mock.patch('happybase.Connection.table', new=table).start()
        # We shouldn't delete data and tables after each test,
        # because it last for too long.
        # All tests tables will be deleted in setup-test-env.sh
        mock.patch("happybase.Connection.disable_table",
                   new=mock.MagicMock()).start()
        mock.patch("happybase.Connection.delete_table",
                   new=mock.MagicMock()).start()
        mock.patch("happybase.Connection.create_table",
                   new=mock.MagicMock()).start()
test_event_scenarios.py 文件源码 项目:panko 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _generate_models(self):
        event_models = []
        base = 0
        self.s_time = datetime.datetime(2013, 12, 31, 5, 0)
        self.trait_time = datetime.datetime(2013, 12, 31, 5, 0)
        for i in range(20):
            trait_models = [models.Trait(name, type, value)
                            for name, type, value in [
                                ('trait_A', models.Trait.TEXT_TYPE,
                                    "my_text"),
                                ('trait_B', models.Trait.INT_TYPE,
                                    base + 1),
                                ('trait_C', models.Trait.FLOAT_TYPE,
                                    float(base) + 0.123456),
                                ('trait_D', models.Trait.DATETIME_TYPE,
                                    self.trait_time)]]

            event_models.append(
                models.Event(message_id=uuidutils.generate_uuid(),
                             event_type='foo.bar',
                             generated=self.trait_time,
                             traits=trait_models,
                             raw={'status': {'nested': 'started'}}))
            self.trait_time += datetime.timedelta(seconds=1)
        self.conn.record_events(event_models)
api.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def create_container(self, context, values):
        # ensure defaults are present for new containers
        if not values.get('uuid'):
            values['uuid'] = uuidutils.generate_uuid()

        if values.get('name'):
            self._validate_unique_container_name(context, values['name'])

        container = models.Container()
        container.update(values)
        try:
            container.save()
        except db_exc.DBDuplicateEntry:
            raise exception.ContainerAlreadyExists(field='UUID',
                                                   value=values['uuid'])
        return container
api.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_container(self, context, container_data):
        # ensure defaults are present for new containers
        if not container_data.get('uuid'):
            container_data['uuid'] = uuidutils.generate_uuid()

        if container_data.get('name'):
            self._validate_unique_container_name(context,
                                                 container_data['name'])

        container = models.Container(container_data)
        try:
            container.save()
        except Exception:
            raise

        return container
test_container_action.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_container_actions_get_by_container(self):
        """Ensure we can get actions by UUID."""
        uuid1 = uuidutils.generate_uuid()

        expected = []

        action_values = self._create_action_values(uuid1)
        action = dbapi.action_start(self.context, action_values)
        expected.append(action)

        action_values['action'] = 'test-action'
        action = dbapi.action_start(self.context, action_values)
        expected.append(action)

        # Create an other container action.
        uuid2 = uuidutils.generate_uuid()
        action_values = self._create_action_values(uuid2, 'test-action')
        dbapi.action_start(self.context, action_values)

        actions = dbapi.actions_get(self.context, uuid1)
        self._assertEqualListsOfObjects(expected, actions)
test_container_action.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_container_action_get_by_container_and_request(self):
        """Ensure we can get an action by container UUID and request_id"""
        uuid1 = uuidutils.generate_uuid()

        action_values = self._create_action_values(uuid1)
        dbapi.action_start(self.context, action_values)
        request_id = action_values['request_id']

        # An other action using a different req id
        action_values['action'] = 'test-action'
        action_values['request_id'] = 'req-00000000-7522-4d99-7ff-111111111111'
        dbapi.action_start(self.context, action_values)

        action = dbapi.action_get_by_request_id(self.context, uuid1,
                                                request_id)
        self.assertEqual('create_container', action['action'])
        self.assertEqual(self.context.request_id, action['request_id'])
test_container_action.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_container_action_event_start(self):
        """Create a container action event."""
        uuid = uuidutils.generate_uuid()

        action_values = self._create_action_values(uuid)
        action = dbapi.action_start(self.context, action_values)

        event_values = self._create_event_values(uuid)
        event = dbapi.action_event_start(self.context, event_values)

        event_values['action_id'] = action['id']
        ignored_keys = self.IGNORED_FIELDS + ['finish_time', 'traceback',
                                              'result']
        self._assertEqualObjects(event_values, event, ignored_keys)

        self._assertActionEventSaved(event, action['id'])
test_container_action.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_container_action_event_finish_success(self):
        """Finish a container action event."""
        uuid = uuidutils.generate_uuid()

        action = dbapi.action_start(self.context,
                                    self._create_action_values(uuid))

        dbapi.action_event_start(self.context,
                                 self._create_event_values(uuid))

        event_values = {
            'finish_time': timeutils.utcnow() + datetime.timedelta(seconds=5),
            'result': 'Success'
        }

        event_values = self._create_event_values(uuid, extra=event_values)
        event = dbapi.action_event_finish(self.context, event_values)

        self._assertActionEventSaved(event, action['id'])
        action = dbapi.action_get_by_request_id(self.context, uuid,
                                                self.context.request_id)
        self.assertNotEqual('Error', action['message'])
test_capsule.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_list_capsules_with_filters(self):
        capsule1 = utils.create_test_capsule(
            name='capsule1',
            uuid=uuidutils.generate_uuid(),
            context=self.context)
        capsule2 = utils.create_test_capsule(
            name='capsule2',
            uuid=uuidutils.generate_uuid(),
            context=self.context)

        res = dbapi.list_capsules(
            self.context, filters={'uuid': capsule1.uuid})
        self.assertEqual([capsule1.id], [r.id for r in res])

        res = dbapi.list_capsules(
            self.context, filters={'uuid': capsule2.uuid})
        self.assertEqual([capsule2.id], [r.id for r in res])

        res = dbapi.list_capsules(
            self.context, filters={'uuid': 'unknow-uuid'})
        self.assertEqual([], [r.id for r in res])
test_capsule.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_list_capsules(self, mock_write, mock_read):
        uuids = []
        capsules = []
        mock_read.side_effect = etcd.EtcdKeyNotFound
        for i in range(1, 6):
            capsule = utils.create_test_capsule(
                uuid=uuidutils.generate_uuid(),
                context=self.context,
                name='capsule' + str(i))
            capsules.append(capsule.as_dict())
            uuids.append(six.text_type(capsule['uuid']))
        mock_read.side_effect = lambda *args: FakeEtcdMultipleResult(
            capsules)
        res = dbapi.list_capsules(self.context)
        res_uuids = [r.uuid for r in res]
        self.assertEqual(sorted(uuids), sorted(res_uuids))
test_inventory.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_list_inventories(self):
        totals = []
        for i in range(1, 6):
            provider = utils.create_test_resource_provider(
                id=i,
                uuid=uuidutils.generate_uuid(),
                context=self.context)
            inventory = utils.create_test_inventory(
                id=i,
                resource_provider_id=provider.id,
                total=i,
                context=self.context)
            totals.append(inventory['total'])
        res = dbapi.list_inventories(self.context)
        res_totals = [r.total for r in res]
        self.assertEqual(sorted(totals), sorted(res_totals))
test_inventory.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_list_inventories_sorted(self):
        totals = []
        for i in range(5):
            provider = utils.create_test_resource_provider(
                id=i,
                uuid=uuidutils.generate_uuid(),
                context=self.context)
            inventory = utils.create_test_inventory(
                id=i,
                resource_provider_id=provider.id,
                total=10 - i,
                context=self.context)
            totals.append(inventory['total'])
        res = dbapi.list_inventories(self.context,
                                     sort_key='total')
        res_totals = [r.total for r in res]
        self.assertEqual(sorted(totals), res_totals)

        self.assertRaises(exception.InvalidParameterValue,
                          dbapi.list_inventories,
                          self.context,
                          sort_key='foo')
test_volume_mapping.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_list_volume_mappings(self, mock_write, mock_read):
        uuids = []
        volume_mappings = []
        mock_read.side_effect = etcd.EtcdKeyNotFound
        for i in range(0, 6):
            volume_mapping = utils.create_test_volume_mapping(
                uuid=uuidutils.generate_uuid(),
                context=self.context,
                name='volume_mapping' + str(i))
            volume_mappings.append(volume_mapping.as_dict())
            uuids.append(six.text_type(volume_mapping['uuid']))
        mock_read.side_effect = lambda *args: FakeEtcdMultipleResult(
            volume_mappings)
        res = dbapi.list_volume_mappings(self.context)
        res_uuids = [r.uuid for r in res]
        self.assertEqual(sorted(uuids), sorted(res_uuids))
test_volume_mapping.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_list_volume_mappings_sorted(self, mock_write, mock_read):
        uuids = []
        volume_mappings = []
        mock_read.side_effect = etcd.EtcdKeyNotFound
        for i in range(0, 6):
            volume_mapping = utils.create_test_volume_mapping(
                uuid=uuidutils.generate_uuid(),
                context=self.context,
                name='volume_mapping' + str(i))
            volume_mappings.append(volume_mapping.as_dict())
            uuids.append(six.text_type(volume_mapping['uuid']))
        mock_read.side_effect = lambda *args: FakeEtcdMultipleResult(
            volume_mappings)
        res = dbapi.list_volume_mappings(self.context, sort_key='uuid')
        res_uuids = [r.uuid for r in res]
        self.assertEqual(sorted(uuids), res_uuids)
        self.assertRaises(exception.InvalidParameterValue,
                          dbapi.list_volume_mappings,
                          self.context,
                          sort_key='wrong_key')
test_container.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_list_containers_sorted(self):
        uuids = []
        for i in range(5):
            container = utils.create_test_container(
                uuid=uuidutils.generate_uuid(),
                context=self.context,
                name='container' + str(i))
            uuids.append(six.text_type(container.uuid))
        res = dbapi.list_containers(self.context, sort_key='uuid')
        res_uuids = [r.uuid for r in res]
        self.assertEqual(sorted(uuids), res_uuids)

        self.assertRaises(exception.InvalidParameterValue,
                          dbapi.list_containers,
                          self.context,
                          sort_key='foo')
test_container.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_update_container_with_the_same_name(self):
        CONF.set_override("unique_container_name_scope", "project",
                          group="compute")
        container1 = utils.create_test_container(
            name='container-one',
            uuid=uuidutils.generate_uuid(),
            context=self.context)
        container2 = utils.create_test_container(
            name='container-two',
            uuid=uuidutils.generate_uuid(),
            context=self.context)
        new_name = 'new_name'
        dbapi.update_container(self.context, container1.id,
                               {'name': new_name})
        self.assertRaises(exception.ContainerAlreadyExists,
                          dbapi.update_container, self.context,
                          container2.id, {'name': new_name})
test_container.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_list_containers(self, mock_write, mock_read):
        uuids = []
        containers = []
        mock_read.side_effect = etcd.EtcdKeyNotFound
        for i in range(1, 6):
            container = utils.create_test_container(
                uuid=uuidutils.generate_uuid(),
                context=self.context,
                name='cont' + str(i))
            containers.append(container.as_dict())
            uuids.append(six.text_type(container['uuid']))
        mock_read.side_effect = lambda *args: FakeEtcdMultipleResult(
            containers)
        res = dbapi.list_containers(self.context)
        res_uuids = [r.uuid for r in res]
        self.assertEqual(sorted(uuids), sorted(res_uuids))
test_container.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_update_container_with_the_same_name(self, mock_update,
                                                 mock_write, mock_read):
        CONF.set_override("unique_container_name_scope", "project",
                          group="compute")
        mock_read.side_effect = etcd.EtcdKeyNotFound
        container1 = utils.create_test_container(
            name='container-one',
            uuid=uuidutils.generate_uuid(),
            context=self.context)
        container2 = utils.create_test_container(
            name='container-two',
            uuid=uuidutils.generate_uuid(),
            context=self.context)

        mock_read.side_effect = lambda *args: FakeEtcdMultipleResult(
            [container1.as_dict(), container2.as_dict()])
        self.assertRaises(exception.ContainerAlreadyExists,
                          dbapi.update_container, self.context,
                          container2.uuid, {'name': 'container-one'})
test_resource_class.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_list_resource_classes_sorted(self):
        names = []
        for i in range(5):
            resource = utils.create_test_resource_class(
                context=self.context,
                uuid=uuidutils.generate_uuid(),
                name='class' + str(i))
            names.append(six.text_type(resource.name))
        res = dbapi.list_resource_classes(self.context, sort_key='name')
        res_names = [r.name for r in res]
        self.assertEqual(sorted(names), res_names)

        self.assertRaises(exception.InvalidParameterValue,
                          dbapi.list_resource_classes,
                          self.context,
                          sort_key='foo')


问题


面经


文章

微信
公众号

扫码关注公众号