python类generate_uuid()的实例源码

test_allocation.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_list_allocations(self):
        cids = []
        for i in range(1, 6):
            provider = utils.create_test_resource_provider(
                id=i,
                uuid=uuidutils.generate_uuid(),
                context=self.context)
            allocation = utils.create_test_allocation(
                id=i,
                resource_provider_id=provider.id,
                consumer_id=uuidutils.generate_uuid(),
                context=self.context)
            cids.append(allocation['consumer_id'])
        res = dbapi.list_allocations(self.context)
        res_cids = [r.consumer_id for r in res]
        self.assertEqual(sorted(cids), sorted(res_cids))
test_compute_host.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_list_compute_nodes_sorted(self):
        uuids = []
        for i in range(5):
            node = utils.create_test_compute_node(
                uuid=uuidutils.generate_uuid(),
                context=self.context,
                hostname='node' + str(i))
            uuids.append(six.text_type(node.uuid))
        res = dbapi.list_compute_nodes(self.context, sort_key='uuid')
        res_uuids = [r.uuid for r in res]
        self.assertEqual(sorted(uuids), res_uuids)

        self.assertRaises(exception.InvalidParameterValue,
                          dbapi.list_compute_nodes,
                          self.context,
                          sort_key='foo')
test_image.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_list_images_with_filters(self):
        image1 = utils.create_test_image(
            context=self.context, repo='image-one',
            uuid=uuidutils.generate_uuid())
        image2 = utils.create_test_image(
            context=self.context, repo='image-two',
            uuid=uuidutils.generate_uuid())

        res = self.dbapi.list_images(self.context,
                                     filters={'repo': 'image-one'})
        self.assertEqual([image1.id], [r.id for r in res])

        res = self.dbapi.list_images(self.context,
                                     filters={'repo': 'image-two'})
        self.assertEqual([image2.id], [r.id for r in res])

        res = self.dbapi.list_images(self.context,
                                     filters={'repo': 'bad-image'})
        self.assertEqual([], [r.id for r in res])

        res = self.dbapi.list_images(
            self.context,
            filters={'repo': image1.repo})
        self.assertEqual([image1.id], [r.id for r in res])
test_resource_provider.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_list_resource_providers_sorted(self):
        uuids = []
        for i in range(5):
            provider = utils.create_test_resource_provider(
                uuid=uuidutils.generate_uuid(),
                context=self.context,
                name='provider' + str(i))
            uuids.append(six.text_type(provider.uuid))
        res = dbapi.list_resource_providers(self.context, sort_key='uuid')
        res_uuids = [r.uuid for r in res]
        self.assertEqual(sorted(uuids), res_uuids)

        self.assertRaises(exception.InvalidParameterValue,
                          dbapi.list_resource_providers,
                          self.context,
                          sort_key='foo')
test_images.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_get_all_images_with_pagination_marker(self, mock_image_list
                                                   ):
        image_list = []
        for id_ in range(4):
            test_image = utils.create_test_image(
                context=self.context,
                id=id_,
                repo='testrepo' + str(id_),
                uuid=uuidutils.generate_uuid())
            image_list.append(objects.Image(self.context, **test_image))
        mock_image_list.return_value = image_list[-1:]
        response = self.get('/v1/images/?limit=3&marker=%s'
                            % image_list[2].uuid)

        self.assertEqual(200, response.status_int)
        actual_images = response.json['images']
        self.assertEqual(1, len(actual_images))
        self.assertEqual(image_list[-1].uuid,
                         actual_images[0].get('uuid'))
test_hosts.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_get_all_hosts_with_pagination_marker(self, mock_host_list,
                                                  mock_policy):
        mock_policy.return_value = True
        host_list = []
        for id_ in range(4):
            test_host = utils.create_test_compute_node(
                context=self.context,
                uuid=uuidutils.generate_uuid())
            numat = numa.NUMATopology._from_dict(test_host['numa_topology'])
            test_host['numa_topology'] = numat
            host = objects.ComputeNode(self.context, **test_host)
            host_list.append(host)
        mock_host_list.return_value = host_list[-1:]
        response = self.get('/v1/hosts?limit=3&marker=%s'
                            % host_list[2].uuid)

        self.assertEqual(200, response.status_int)
        actual_hosts = response.json['hosts']
        self.assertEqual(1, len(actual_hosts))
        self.assertEqual(host_list[-1].uuid,
                         actual_hosts[0].get('uuid'))
test_containers.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_get_all_containers_with_pagination_marker(self,
                                                       mock_container_list,
                                                       mock_container_show):
        container_list = []
        for id_ in range(4):
            test_container = utils.create_test_container(
                id=id_, uuid=uuidutils.generate_uuid(),
                name='container' + str(id_), context=self.context)
            container_list.append(objects.Container(self.context,
                                                    **test_container))
        mock_container_list.return_value = container_list[-1:]
        mock_container_show.return_value = container_list[-1]
        response = self.get('/v1/containers/?limit=3&marker=%s'
                            % container_list[2].uuid)

        self.assertEqual(200, response.status_int)
        actual_containers = response.json['containers']
        self.assertEqual(1, len(actual_containers))
        self.assertEqual(container_list[-1].uuid,
                         actual_containers[0].get('uuid'))
test_volume_mapping.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_refresh(self):
        uuid = self.fake_volume_mapping['uuid']
        new_uuid = uuidutils.generate_uuid()
        returns = [dict(self.fake_volume_mapping, uuid=uuid),
                   dict(self.fake_volume_mapping, uuid=new_uuid)]
        expected = [mock.call(self.context, uuid),
                    mock.call(self.context, uuid)]
        with mock.patch.object(self.dbapi, 'get_volume_mapping_by_uuid',
                               side_effect=returns,
                               autospec=True) as mock_get_volume_mapping:
            volume_mapping = objects.VolumeMapping.get_by_uuid(self.context,
                                                               uuid)
            self.assertEqual(uuid, volume_mapping.uuid)
            volume_mapping.refresh()
            self.assertEqual(new_uuid, volume_mapping.uuid)
            self.assertEqual(expected, mock_get_volume_mapping.call_args_list)
            self.assertEqual(self.context, volume_mapping._context)
test_container.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_refresh(self):
        uuid = self.fake_container['uuid']
        new_uuid = uuidutils.generate_uuid()
        returns = [dict(self.fake_container, uuid=uuid),
                   dict(self.fake_container, uuid=new_uuid)]
        expected = [mock.call(self.context, uuid),
                    mock.call(self.context, uuid)]
        with mock.patch.object(self.dbapi, 'get_container_by_uuid',
                               side_effect=returns,
                               autospec=True) as mock_get_container:
            container = objects.Container.get_by_uuid(self.context, uuid)
            self.assertEqual(uuid, container.uuid)
            container.refresh()
            self.assertEqual(new_uuid, container.uuid)
            self.assertEqual(expected, mock_get_container.call_args_list)
            self.assertEqual(self.context, container._context)
manager.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def container_attach(self, context, container):
        LOG.debug('Get websocket url from the container: %s', container.uuid)
        try:
            url = self.driver.get_websocket_url(context, container)
            token = uuidutils.generate_uuid()
            access_url = '%s?token=%s&uuid=%s' % (
                CONF.websocket_proxy.base_url, token, container.uuid)
            container.websocket_url = url
            container.websocket_token = token
            container.save(context)
            return access_url
        except Exception as e:
            LOG.error("Error occurred while calling "
                      "get websocket url function: %s",
                      six.text_type(e))
            raise
test_resources.py 文件源码 项目:vmware-nsxlib 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_update_logical_router_port(self):
        fake_router_port = test_constants.FAKE_ROUTER_PORT.copy()
        uuid = fake_router_port['id']
        fake_relay_uuid = uuidutils.generate_uuid()
        lrport = self.get_mocked_resource()
        with mock.patch.object(lrport, 'get', return_value=fake_router_port),\
            mock.patch("vmware_nsxlib.v3.NsxLib.get_version",
                       return_value='2.0.0'):
            lrport.update(uuid, relay_service_uuid=fake_relay_uuid)
            data = {
                'id': uuid,
                'display_name': fake_router_port['display_name'],
                'logical_router_id': fake_router_port['logical_router_id'],
                'resource_type': fake_router_port['resource_type'],
                "revision": 0,
                'service_bindings': [{'service_id': {
                    'target_type': 'LogicalService',
                    'target_id': fake_relay_uuid}}]
            }

            test_client.assert_json_call(
                'put', lrport,
                'https://1.2.3.4/api/v1/logical-router-ports/%s' % uuid,
                data=jsonutils.dumps(data, sort_keys=True),
                headers=self.default_headers())
test_nested.py 文件源码 项目:kuryr-libnetwork 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_get_default_network_id(self, mock_get_port_from_host, mock_conf):
        mock_conf.binding.link_iface = 'eth0'

        fake_endpoint_id = lib_utils.get_hash()
        fake_neutron_port_id = uuidutils.generate_uuid()
        fake_neutron_net_id = uuidutils.generate_uuid()
        fake_neutron_v4_subnet_id = uuidutils.generate_uuid()
        fake_neutron_v6_subnet_id = uuidutils.generate_uuid()

        fake_vm_port = self._get_fake_port(
            fake_endpoint_id, fake_neutron_net_id,
            fake_neutron_port_id, lib_const.PORT_STATUS_ACTIVE,
            fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id)['port']
        mock_get_port_from_host.return_value = fake_vm_port

        nested_driver = nested.NestedDriver()
        host_network_id = nested_driver.get_default_network_id()
        mock_get_port_from_host.assert_called_with('eth0')
        self.assertEqual(host_network_id, fake_vm_port['network_id'])
test_vlan.py 文件源码 项目:kuryr-libnetwork 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_get_segmentation_id(self, mock_alloc_seg_id, mock_trunk_port,
                                 mock_vlan_check):
        mock_trunk_port.return_value = None
        mock_vlan_check.return_value = None
        fake_neutron_port1_id = uuidutils.generate_uuid()
        fake_neutron_port2_id = uuidutils.generate_uuid()
        mock_alloc_seg_id.side_effect = [1, 2]

        vlan_driver = vlan.VlanDriver()

        response = vlan_driver._get_segmentation_id(fake_neutron_port1_id)
        mock_alloc_seg_id.assert_called_once()
        self.assertEqual(response, 1)

        mock_alloc_seg_id.reset_mock()
        response = vlan_driver._get_segmentation_id(fake_neutron_port1_id)
        mock_alloc_seg_id.assert_not_called()
        self.assertEqual(response, 1)

        response = vlan_driver._get_segmentation_id(fake_neutron_port2_id)
        mock_alloc_seg_id.assert_called_once()
        self.assertEqual(response, 2)
test_veth.py 文件源码 项目:kuryr-libnetwork 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_create_host_iface(self, mock_port_bind):
        veth_driver = veth.VethDriver()
        fake_endpoint_id = lib_utils.get_hash()
        fake_neutron_port = uuidutils.generate_uuid()
        fake_neutron_net_id = uuidutils.generate_uuid()
        fake_neutron_v4_subnet_id = uuidutils.generate_uuid()
        fake_neutron_v6_subnet_id = uuidutils.generate_uuid()

        fake_subnets = self._get_fake_subnets(
            fake_endpoint_id, fake_neutron_net_id,
            fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id)
        fake_network = mock.sentinel.binding_network
        fake_exec_response = ('fake_stdout', '')
        mock_port_bind.return_value = ('fake_host_ifname',
            'fake_container_ifname', fake_exec_response)

        response = veth_driver.create_host_iface(fake_endpoint_id,
            fake_neutron_port, fake_subnets, fake_network)
        mock_port_bind.assert_called_with(fake_endpoint_id,
            fake_neutron_port, fake_subnets, fake_network)
        self.assertEqual(response, fake_exec_response)
test_mysql_kafka_offsets.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_add_offset(self):

        topic_1 = uuidutils.generate_uuid()
        partition_1 = random.randint(0, 1024)
        until_offset_1 = random.randint(0, sys.maxsize)
        from_offset_1 = random.randint(0, sys.maxsize)
        app_name_1 = uuidutils.generate_uuid()
        offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)

        my_batch_time = self.get_dummy_batch_time()

        used_values = {}
        self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
                                    app_name=app_name_1,
                                    from_offset=from_offset_1,
                                    until_offset=until_offset_1,
                                    batch_time_info=my_batch_time)
        used_values[offset_key_1] = {
            "topic": topic_1, "partition": partition_1, "app_name": app_name_1,
            "from_offset": from_offset_1, "until_offset": until_offset_1
        }

        kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
            app_name_1)
        offset_value_1 = kafka_offset_specs.get(offset_key_1)
        self.assertions_on_offset(used_value=used_values.get(offset_key_1),
                                  offset_value=offset_value_1)
test_mysql_kafka_offsets.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_update_offset_values(self):
        topic_1 = uuidutils.generate_uuid()
        partition_1 = random.randint(0, 1024)
        until_offset_1 = random.randint(0, sys.maxsize)
        from_offset_1 = random.randint(0, sys.maxsize)
        app_name_1 = uuidutils.generate_uuid()
        offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)

        my_batch_time = self.get_dummy_batch_time()

        self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
                                    app_name=app_name_1,
                                    from_offset=from_offset_1,
                                    until_offset=until_offset_1,
                                    batch_time_info=my_batch_time)

        until_offset_2 = random.randint(0, sys.maxsize)
        while until_offset_2 == until_offset_1:
            until_offset_2 = random.randint(0, sys.maxsize)

        from_offset_2 = random.randint(0, sys.maxsize)
        while from_offset_2 == from_offset_1:
            from_offset_2 = random.randint(0, sys.maxsize)

        self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
                                    app_name=app_name_1,
                                    from_offset=from_offset_2,
                                    until_offset=until_offset_2,
                                    batch_time_info=my_batch_time)

        kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
            app_name_1)
        updated_offset_value = kafka_offset_specs.get(offset_key_1)
        self.assertEqual(from_offset_2, updated_offset_value.get_from_offset())
        self.assertEqual(until_offset_2,
                         updated_offset_value.get_until_offset())
test_os_vif_util.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_neutron_to_osvif_network(self):
        network_id = uuidutils.generate_uuid()
        network_name = 'test-net'
        network_mtu = 1500
        neutron_network = {
            'id': network_id,
            'name': network_name,
            'mtu': network_mtu,
        }

        network = ovu.neutron_to_osvif_network(neutron_network)

        self.assertEqual(network_id, network.id)
        self.assertEqual(network_name, network.label)
        self.assertEqual(network_mtu, network.mtu)
test_os_vif_util.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_neutron_to_osvif_network_no_name(self):
        network_id = uuidutils.generate_uuid()
        network_mtu = 1500
        neutron_network = {
            'id': network_id,
            'mtu': network_mtu,
        }

        network = ovu.neutron_to_osvif_network(neutron_network)

        self.assertFalse(network.obj_attr_is_set('label'))
test_os_vif_util.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_neutron_to_osvif_network_no_mtu(self):
        network_id = uuidutils.generate_uuid()
        network_name = 'test-net'
        neutron_network = {
            'id': network_id,
            'name': network_name,
        }

        network = ovu.neutron_to_osvif_network(neutron_network)

        self.assertIsNone(network.mtu)
test_os_vif_util.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_neutron_to_osvif_vif_ovs_no_bridge(self):
        vif_plugin = 'ovs'
        port = {'id': uuidutils.generate_uuid()}
        subnets = {}

        self.assertRaises(o_cfg.RequiredOptError,
                          ovu.neutron_to_osvif_vif_ovs,
                          vif_plugin, port, subnets)


问题


面经


文章

微信
公众号

扫码关注公众号