def test_list_allocations(self):
cids = []
for i in range(1, 6):
provider = utils.create_test_resource_provider(
id=i,
uuid=uuidutils.generate_uuid(),
context=self.context)
allocation = utils.create_test_allocation(
id=i,
resource_provider_id=provider.id,
consumer_id=uuidutils.generate_uuid(),
context=self.context)
cids.append(allocation['consumer_id'])
res = dbapi.list_allocations(self.context)
res_cids = [r.consumer_id for r in res]
self.assertEqual(sorted(cids), sorted(res_cids))
python类generate_uuid()的实例源码
def test_list_compute_nodes_sorted(self):
uuids = []
for i in range(5):
node = utils.create_test_compute_node(
uuid=uuidutils.generate_uuid(),
context=self.context,
hostname='node' + str(i))
uuids.append(six.text_type(node.uuid))
res = dbapi.list_compute_nodes(self.context, sort_key='uuid')
res_uuids = [r.uuid for r in res]
self.assertEqual(sorted(uuids), res_uuids)
self.assertRaises(exception.InvalidParameterValue,
dbapi.list_compute_nodes,
self.context,
sort_key='foo')
def test_list_images_with_filters(self):
image1 = utils.create_test_image(
context=self.context, repo='image-one',
uuid=uuidutils.generate_uuid())
image2 = utils.create_test_image(
context=self.context, repo='image-two',
uuid=uuidutils.generate_uuid())
res = self.dbapi.list_images(self.context,
filters={'repo': 'image-one'})
self.assertEqual([image1.id], [r.id for r in res])
res = self.dbapi.list_images(self.context,
filters={'repo': 'image-two'})
self.assertEqual([image2.id], [r.id for r in res])
res = self.dbapi.list_images(self.context,
filters={'repo': 'bad-image'})
self.assertEqual([], [r.id for r in res])
res = self.dbapi.list_images(
self.context,
filters={'repo': image1.repo})
self.assertEqual([image1.id], [r.id for r in res])
def test_list_resource_providers_sorted(self):
uuids = []
for i in range(5):
provider = utils.create_test_resource_provider(
uuid=uuidutils.generate_uuid(),
context=self.context,
name='provider' + str(i))
uuids.append(six.text_type(provider.uuid))
res = dbapi.list_resource_providers(self.context, sort_key='uuid')
res_uuids = [r.uuid for r in res]
self.assertEqual(sorted(uuids), res_uuids)
self.assertRaises(exception.InvalidParameterValue,
dbapi.list_resource_providers,
self.context,
sort_key='foo')
def test_get_all_images_with_pagination_marker(self, mock_image_list
):
image_list = []
for id_ in range(4):
test_image = utils.create_test_image(
context=self.context,
id=id_,
repo='testrepo' + str(id_),
uuid=uuidutils.generate_uuid())
image_list.append(objects.Image(self.context, **test_image))
mock_image_list.return_value = image_list[-1:]
response = self.get('/v1/images/?limit=3&marker=%s'
% image_list[2].uuid)
self.assertEqual(200, response.status_int)
actual_images = response.json['images']
self.assertEqual(1, len(actual_images))
self.assertEqual(image_list[-1].uuid,
actual_images[0].get('uuid'))
def test_get_all_hosts_with_pagination_marker(self, mock_host_list,
mock_policy):
mock_policy.return_value = True
host_list = []
for id_ in range(4):
test_host = utils.create_test_compute_node(
context=self.context,
uuid=uuidutils.generate_uuid())
numat = numa.NUMATopology._from_dict(test_host['numa_topology'])
test_host['numa_topology'] = numat
host = objects.ComputeNode(self.context, **test_host)
host_list.append(host)
mock_host_list.return_value = host_list[-1:]
response = self.get('/v1/hosts?limit=3&marker=%s'
% host_list[2].uuid)
self.assertEqual(200, response.status_int)
actual_hosts = response.json['hosts']
self.assertEqual(1, len(actual_hosts))
self.assertEqual(host_list[-1].uuid,
actual_hosts[0].get('uuid'))
def test_get_all_containers_with_pagination_marker(self,
mock_container_list,
mock_container_show):
container_list = []
for id_ in range(4):
test_container = utils.create_test_container(
id=id_, uuid=uuidutils.generate_uuid(),
name='container' + str(id_), context=self.context)
container_list.append(objects.Container(self.context,
**test_container))
mock_container_list.return_value = container_list[-1:]
mock_container_show.return_value = container_list[-1]
response = self.get('/v1/containers/?limit=3&marker=%s'
% container_list[2].uuid)
self.assertEqual(200, response.status_int)
actual_containers = response.json['containers']
self.assertEqual(1, len(actual_containers))
self.assertEqual(container_list[-1].uuid,
actual_containers[0].get('uuid'))
def test_refresh(self):
uuid = self.fake_volume_mapping['uuid']
new_uuid = uuidutils.generate_uuid()
returns = [dict(self.fake_volume_mapping, uuid=uuid),
dict(self.fake_volume_mapping, uuid=new_uuid)]
expected = [mock.call(self.context, uuid),
mock.call(self.context, uuid)]
with mock.patch.object(self.dbapi, 'get_volume_mapping_by_uuid',
side_effect=returns,
autospec=True) as mock_get_volume_mapping:
volume_mapping = objects.VolumeMapping.get_by_uuid(self.context,
uuid)
self.assertEqual(uuid, volume_mapping.uuid)
volume_mapping.refresh()
self.assertEqual(new_uuid, volume_mapping.uuid)
self.assertEqual(expected, mock_get_volume_mapping.call_args_list)
self.assertEqual(self.context, volume_mapping._context)
def test_refresh(self):
uuid = self.fake_container['uuid']
new_uuid = uuidutils.generate_uuid()
returns = [dict(self.fake_container, uuid=uuid),
dict(self.fake_container, uuid=new_uuid)]
expected = [mock.call(self.context, uuid),
mock.call(self.context, uuid)]
with mock.patch.object(self.dbapi, 'get_container_by_uuid',
side_effect=returns,
autospec=True) as mock_get_container:
container = objects.Container.get_by_uuid(self.context, uuid)
self.assertEqual(uuid, container.uuid)
container.refresh()
self.assertEqual(new_uuid, container.uuid)
self.assertEqual(expected, mock_get_container.call_args_list)
self.assertEqual(self.context, container._context)
def container_attach(self, context, container):
LOG.debug('Get websocket url from the container: %s', container.uuid)
try:
url = self.driver.get_websocket_url(context, container)
token = uuidutils.generate_uuid()
access_url = '%s?token=%s&uuid=%s' % (
CONF.websocket_proxy.base_url, token, container.uuid)
container.websocket_url = url
container.websocket_token = token
container.save(context)
return access_url
except Exception as e:
LOG.error("Error occurred while calling "
"get websocket url function: %s",
six.text_type(e))
raise
def test_update_logical_router_port(self):
fake_router_port = test_constants.FAKE_ROUTER_PORT.copy()
uuid = fake_router_port['id']
fake_relay_uuid = uuidutils.generate_uuid()
lrport = self.get_mocked_resource()
with mock.patch.object(lrport, 'get', return_value=fake_router_port),\
mock.patch("vmware_nsxlib.v3.NsxLib.get_version",
return_value='2.0.0'):
lrport.update(uuid, relay_service_uuid=fake_relay_uuid)
data = {
'id': uuid,
'display_name': fake_router_port['display_name'],
'logical_router_id': fake_router_port['logical_router_id'],
'resource_type': fake_router_port['resource_type'],
"revision": 0,
'service_bindings': [{'service_id': {
'target_type': 'LogicalService',
'target_id': fake_relay_uuid}}]
}
test_client.assert_json_call(
'put', lrport,
'https://1.2.3.4/api/v1/logical-router-ports/%s' % uuid,
data=jsonutils.dumps(data, sort_keys=True),
headers=self.default_headers())
def test_get_default_network_id(self, mock_get_port_from_host, mock_conf):
mock_conf.binding.link_iface = 'eth0'
fake_endpoint_id = lib_utils.get_hash()
fake_neutron_port_id = uuidutils.generate_uuid()
fake_neutron_net_id = uuidutils.generate_uuid()
fake_neutron_v4_subnet_id = uuidutils.generate_uuid()
fake_neutron_v6_subnet_id = uuidutils.generate_uuid()
fake_vm_port = self._get_fake_port(
fake_endpoint_id, fake_neutron_net_id,
fake_neutron_port_id, lib_const.PORT_STATUS_ACTIVE,
fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id)['port']
mock_get_port_from_host.return_value = fake_vm_port
nested_driver = nested.NestedDriver()
host_network_id = nested_driver.get_default_network_id()
mock_get_port_from_host.assert_called_with('eth0')
self.assertEqual(host_network_id, fake_vm_port['network_id'])
def test_get_segmentation_id(self, mock_alloc_seg_id, mock_trunk_port,
mock_vlan_check):
mock_trunk_port.return_value = None
mock_vlan_check.return_value = None
fake_neutron_port1_id = uuidutils.generate_uuid()
fake_neutron_port2_id = uuidutils.generate_uuid()
mock_alloc_seg_id.side_effect = [1, 2]
vlan_driver = vlan.VlanDriver()
response = vlan_driver._get_segmentation_id(fake_neutron_port1_id)
mock_alloc_seg_id.assert_called_once()
self.assertEqual(response, 1)
mock_alloc_seg_id.reset_mock()
response = vlan_driver._get_segmentation_id(fake_neutron_port1_id)
mock_alloc_seg_id.assert_not_called()
self.assertEqual(response, 1)
response = vlan_driver._get_segmentation_id(fake_neutron_port2_id)
mock_alloc_seg_id.assert_called_once()
self.assertEqual(response, 2)
def test_create_host_iface(self, mock_port_bind):
veth_driver = veth.VethDriver()
fake_endpoint_id = lib_utils.get_hash()
fake_neutron_port = uuidutils.generate_uuid()
fake_neutron_net_id = uuidutils.generate_uuid()
fake_neutron_v4_subnet_id = uuidutils.generate_uuid()
fake_neutron_v6_subnet_id = uuidutils.generate_uuid()
fake_subnets = self._get_fake_subnets(
fake_endpoint_id, fake_neutron_net_id,
fake_neutron_v4_subnet_id, fake_neutron_v6_subnet_id)
fake_network = mock.sentinel.binding_network
fake_exec_response = ('fake_stdout', '')
mock_port_bind.return_value = ('fake_host_ifname',
'fake_container_ifname', fake_exec_response)
response = veth_driver.create_host_iface(fake_endpoint_id,
fake_neutron_port, fake_subnets, fake_network)
mock_port_bind.assert_called_with(fake_endpoint_id,
fake_neutron_port, fake_subnets, fake_network)
self.assertEqual(response, fake_exec_response)
def test_add_offset(self):
topic_1 = uuidutils.generate_uuid()
partition_1 = random.randint(0, 1024)
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = uuidutils.generate_uuid()
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
my_batch_time = self.get_dummy_batch_time()
used_values = {}
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_1,
until_offset=until_offset_1,
batch_time_info=my_batch_time)
used_values[offset_key_1] = {
"topic": topic_1, "partition": partition_1, "app_name": app_name_1,
"from_offset": from_offset_1, "until_offset": until_offset_1
}
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
app_name_1)
offset_value_1 = kafka_offset_specs.get(offset_key_1)
self.assertions_on_offset(used_value=used_values.get(offset_key_1),
offset_value=offset_value_1)
def test_update_offset_values(self):
topic_1 = uuidutils.generate_uuid()
partition_1 = random.randint(0, 1024)
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
app_name_1 = uuidutils.generate_uuid()
offset_key_1 = "%s_%s_%s" % (app_name_1, topic_1, partition_1)
my_batch_time = self.get_dummy_batch_time()
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_1,
until_offset=until_offset_1,
batch_time_info=my_batch_time)
until_offset_2 = random.randint(0, sys.maxsize)
while until_offset_2 == until_offset_1:
until_offset_2 = random.randint(0, sys.maxsize)
from_offset_2 = random.randint(0, sys.maxsize)
while from_offset_2 == from_offset_1:
from_offset_2 = random.randint(0, sys.maxsize)
self.kafka_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name_1,
from_offset=from_offset_2,
until_offset=until_offset_2,
batch_time_info=my_batch_time)
kafka_offset_specs = self.kafka_offset_specs.get_kafka_offsets(
app_name_1)
updated_offset_value = kafka_offset_specs.get(offset_key_1)
self.assertEqual(from_offset_2, updated_offset_value.get_from_offset())
self.assertEqual(until_offset_2,
updated_offset_value.get_until_offset())
def test_neutron_to_osvif_network(self):
network_id = uuidutils.generate_uuid()
network_name = 'test-net'
network_mtu = 1500
neutron_network = {
'id': network_id,
'name': network_name,
'mtu': network_mtu,
}
network = ovu.neutron_to_osvif_network(neutron_network)
self.assertEqual(network_id, network.id)
self.assertEqual(network_name, network.label)
self.assertEqual(network_mtu, network.mtu)
def test_neutron_to_osvif_network_no_name(self):
network_id = uuidutils.generate_uuid()
network_mtu = 1500
neutron_network = {
'id': network_id,
'mtu': network_mtu,
}
network = ovu.neutron_to_osvif_network(neutron_network)
self.assertFalse(network.obj_attr_is_set('label'))
def test_neutron_to_osvif_network_no_mtu(self):
network_id = uuidutils.generate_uuid()
network_name = 'test-net'
neutron_network = {
'id': network_id,
'name': network_name,
}
network = ovu.neutron_to_osvif_network(neutron_network)
self.assertIsNone(network.mtu)
def test_neutron_to_osvif_vif_ovs_no_bridge(self):
vif_plugin = 'ovs'
port = {'id': uuidutils.generate_uuid()}
subnets = {}
self.assertRaises(o_cfg.RequiredOptError,
ovu.neutron_to_osvif_vif_ovs,
vif_plugin, port, subnets)