def test_volumedriver_create(self):
self.volume_providers_setup(['cinder'])
fake_request = {
u'Name': u'test-vol',
u'Opts': {u'size': u'1'},
}
for provider in app.volume_providers.values():
provider.check_exist = mock.MagicMock()
provider.check_exist.return_value = False
provider.create = mock.MagicMock()
response = self.app.post('/VolumeDriver.Create',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Err': u''
}
self.assertEqual(200, response.status_code)
self.assertEqual(fake_response, jsonutils.loads(response.data))
python类dumps()的实例源码
def test_volumedriver_create_invalid_volume_provider(self):
self.volume_providers_setup(['cinder'])
fake_request = {
u'Name': u'test-vol',
u'Opts': {u'size': u'1',
u'volume_provider': u'provider'}}
for provider in app.volume_providers.values():
provider.check_exist = mock.MagicMock()
provider.check_exist.return_value = False
provider.create = mock.MagicMock()
response = self.app.post('VolumeDriver.Create',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Err': u''
}
self.assertEqual(200, response.status_code)
self.assertNotEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_remove(self):
self.volume_providers_setup(['cinder'])
fake_request = {
u'Name': u'test-vol'
}
for provider in app.volume_providers.values():
provider.delete = mock.MagicMock()
provider.delete.return_value = True
response = self.app.post('/VolumeDriver.Remove',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Err': u''
}
self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_mount(self):
self.volume_providers_setup(['cinder'])
fake_name = u'test-vol'
fake_request = {
u'Name': fake_name
}
for provider in app.volume_providers.values():
provider.check_exist = mock.MagicMock()
provider.check_exist.return_value = True
provider.mount = mock.MagicMock()
provider.mount.return_value = fake_mountpoint(fake_name)
response = self.app.post('/VolumeDriver.Mount',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Mountpoint': fake_mountpoint(fake_name),
u'Err': u''
}
self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_mount_with_volume_not_exist(self):
self.volume_providers_setup(['cinder'])
fake_name = u'test-vol'
fake_request = {
u'Name': fake_name,
}
for provider in app.volume_providers.values():
provider.check_exit = mock.MagicMock()
provider.check_exit.return_value = False
response = self.app.post('/VolumeDriver.Mount',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Mountpoint': fake_mountpoint(fake_name),
u'Err': u''
}
self.assertEqual(200, response.status_code)
self.assertNotEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_path(self):
self.volume_providers_setup(['cinder'])
fake_name = u'test-vol'
fake_request = {
u'Name': fake_name
}
for provider in app.volume_providers.values():
provider.show = mock.MagicMock()
provider.show.return_value = fake_volume(fake_name)
response = self.app.post('/VolumeDriver.Path',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Mountpoint': fake_mountpoint(fake_name),
u'Err': u''
}
self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_path_with_volume_not_exist(self):
self.volume_providers_setup(['cinder'])
fake_docker_volume_name = u'test-vol'
fake_request = {
u'Name': fake_docker_volume_name
}
for provider in app.volume_providers.values():
provider.show = mock.MagicMock(side_effect=exceptions.NotFound)
response = self.app.post('/VolumeDriver.Path',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Err': u'Mountpoint Not Found'
}
self.assertEqual(200, response.status_code)
self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_volumedriver_get_with_volume_not_exist(self):
self.volume_providers_setup(['cinder'])
fake_docker_volume_name = u'test-vol'
fake_request = {
u'Name': fake_docker_volume_name
}
for provider in app.volume_providers.values():
provider.show = mock.MagicMock(side_effect=exceptions.NotFound())
response = self.app.post('/VolumeDriver.Get',
content_type='application/json',
data=jsonutils.dumps(fake_request))
fake_response = {
u'Err': u'Volume Not Found'
}
self.assertEqual(200, response.status_code)
self.assertEqual(fake_response, jsonutils.loads(response.data))
def test_ipam_driver_request_pool_with_default_v6pool(self,
mock_list_subnetpools):
fake_kuryr_subnetpool_id = uuidutils.generate_uuid()
fake_name = 'kuryr6'
kuryr_subnetpools = self._get_fake_v6_subnetpools(
fake_kuryr_subnetpool_id, prefixes=['fe80::/64'])
mock_list_subnetpools.return_value = {
'subnetpools': kuryr_subnetpools['subnetpools']}
fake_request = {
'AddressSpace': '',
'Pool': '',
'SubPool': '', # In the case --ip-range is not given
'Options': {},
'V6': True
}
response = self.app.post('/IpamDriver.RequestPool',
content_type='application/json',
data=jsonutils.dumps(fake_request))
self.assertEqual(200, response.status_code)
mock_list_subnetpools.assert_called_with(name=fake_name)
decoded_json = jsonutils.loads(response.data)
self.assertEqual(fake_kuryr_subnetpool_id, decoded_json['PoolID'])
def test_network_driver_endpoint_operational_info_with_no_port(self):
docker_network_id = lib_utils.get_hash()
docker_endpoint_id = lib_utils.get_hash()
fake_port_response = {"ports": []}
with mock.patch.object(app.neutron, 'list_ports') as mock_list_ports:
data = {
'NetworkID': docker_network_id,
'EndpointID': docker_endpoint_id,
}
mock_list_ports.return_value = fake_port_response
response = self.app.post('/NetworkDriver.EndpointOperInfo',
content_type='application/json',
data=jsonutils.dumps(data))
decoded_json = jsonutils.loads(response.data)
self.assertEqual(200, response.status_code)
port_name = utils.get_neutron_port_name(docker_endpoint_id)
mock_list_ports.assert_called_once_with(name=port_name)
self.assertEqual({}, decoded_json['Value'])
def test_network_driver_allocate_network(self):
docker_network_id = lib_utils.get_hash()
allocate_network_request = {
'NetworkID': docker_network_id,
'IPv4Data': [{
'AddressSpace': 'foo',
'Pool': '192.168.42.0/24',
'Gateway': '192.168.42.1/24',
}],
'IPv6Data': [],
'Options': {}
}
response = self.app.post('/NetworkDriver.AllocateNetwork',
content_type='application/json',
data=jsonutils.dumps(
allocate_network_request))
self.assertEqual(200, response.status_code)
decoded_json = jsonutils.loads(response.data)
self.assertEqual({'Options': {}}, decoded_json)
test_handlers.py 文件源码
项目:fuel-nailgun-extension-cluster-upgrade
作者: openstack
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def test_node_reassign_handler_with_roles(self, mcast):
cluster = self.env.create(
cluster_kwargs={'api': False},
nodes_kwargs=[{'status': consts.NODE_STATUSES.ready,
'roles': ['controller']}])
node = cluster.nodes[0]
seed_cluster = self.env.create_cluster(api=False)
# NOTE(akscram): reprovision=True means that the node will be
# re-provisioned during the reassigning. This is
# a default behavior.
data = {'nodes_ids': [node.id],
'reprovision': True,
'roles': ['compute']}
resp = self.app.post(
reverse('NodeReassignHandler',
kwargs={'cluster_id': seed_cluster.id}),
jsonutils.dumps(data),
headers=self.default_headers)
self.assertEqual(202, resp.status_code)
self.assertEqual(node.roles, [])
self.assertEqual(node.pending_roles, ['compute'])
self.assertTrue(mcast.called)
test_handlers.py 文件源码
项目:fuel-nailgun-extension-cluster-upgrade
作者: openstack
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_node_reassign_handler_without_reprovisioning(self, mcast):
cluster = self.env.create(
cluster_kwargs={'api': False},
nodes_kwargs=[{'status': consts.NODE_STATUSES.ready,
'roles': ['controller']}])
node = cluster.nodes[0]
seed_cluster = self.env.create_cluster(api=False)
data = {'nodes_ids': [node.id],
'reprovision': False,
'roles': ['compute']}
resp = self.app.post(
reverse('NodeReassignHandler',
kwargs={'cluster_id': seed_cluster.id}),
jsonutils.dumps(data),
headers=self.default_headers)
self.assertEqual(200, resp.status_code)
self.assertFalse(mcast.called)
self.assertEqual(node.roles, ['compute'])
def playbook_treeview(playbook):
"""
Creates a fake filesystem with playbook files and uses generate_tree() to
recurse and return a JSON structure suitable for bootstrap-treeview.
"""
fs = fake_filesystem.FakeFilesystem()
mock_os = fake_filesystem.FakeOsModule(fs)
files = models.File.query.filter(models.File.playbook_id.in_([playbook]))
paths = {}
for file in files:
fs.CreateFile(file.path)
paths[file.path] = file.id
return jsonutils.dumps(generate_tree('/', paths, mock_os),
sort_keys=True,
indent=2)
def _init_all_fc_dvs(self):
"""Send message to fc and get dvswitch info
:return:
"""
LOG.debug("loading dvs mapping ")
dvs_map_temp = {}
physnet_map_temp = {}
data = self.get(self.site.dvswitchs_uri)
if not data.get(constant.DVSWITCHS):
raise fc_exc.DVSwitchNotFound()
dvs = data.get(constant.DVSWITCHS)
if dvs and len(dvs) > 0:
for dvswitch in dvs:
dvs_id = utils.get_id_from_urn(dvswitch.get('urn'))
dvs_map_temp[dvswitch["name"]] = dvs_id
self.update_physnet_map(dvs_id, physnet_map_temp)
LOG.debug(
"init all fc dvs dvs map is %s, physnet map is %s",
jsonutils.dumps(dvs_map_temp),
jsonutils.dumps(physnet_map_temp))
self.dvs_mapping = dvs_map_temp
self.physnet_mapping = physnet_map_temp
def create_vsp(self, dvs_id, pg_urn, vif):
"""send message to fusion compute to create a vsp
:param dvs_id:
:param pg_urn:
:param vif:
:return:
"""
vsp_path = self.get_path_by_site(constant.VSP_URI,
dvs_id=dvs_id)
port_id = vif['id']
body = {
'name': port_id,
'portGroupUrn': pg_urn,
'tags': [{'tagKey': constant.VSP_TAG_KEY, 'tagValue': port_id}]
}
ret = self.post(vsp_path, data=jsonutils.dumps(body))
return ret
def create_subports(num_ports, trunk_ips, timeout=180):
method = 'POST'
body = jsonutils.dumps({"trunks": trunk_ips, "num_ports": num_ports})
headers = {'Content-Type': 'application/json', 'Connection': 'close'}
headers['Content-Length'] = len(body)
path = 'http://localhost{0}'.format(constants.VIF_POOL_POPULATE)
socket_path = constants.MANAGER_SOCKET_FILE
conn = UnixDomainHttpConnection(socket_path, timeout)
conn.request(method, path, body=body, headers=headers)
resp = conn.getresponse()
print(resp.read())
def delete_subports(trunk_ips, timeout=180):
method = 'POST'
body = jsonutils.dumps({"trunks": trunk_ips})
headers = {'Content-Type': 'application/json', 'Connection': 'close'}
headers['Content-Length'] = len(body)
path = 'http://localhost{0}'.format(constants.VIF_POOL_FREE)
socket_path = constants.MANAGER_SOCKET_FILE
conn = UnixDomainHttpConnection(socket_path, timeout)
conn.request(method, path, body=body, headers=headers)
resp = conn.getresponse()
print(resp.read())
def list_pools(timeout=180):
method = 'GET'
body = jsonutils.dumps({})
headers = {'Context-Type': 'application/json', 'Connection': 'close'}
headers['Context-Length'] = len(body)
path = 'http://localhost{0}'.format(constants.VIF_POOL_LIST)
socket_path = constants.MANAGER_SOCKET_FILE
conn = UnixDomainHttpConnection(socket_path, timeout)
conn.request(method, path, body=body, headers=headers)
resp = conn.getresponse()
print(resp.read())
def show_pool(trunk_ip, project_id, sg, timeout=180):
method = 'GET'
body = jsonutils.dumps({"pool_key": [trunk_ip, project_id, sg]})
headers = {'Context-Type': 'application/json', 'Connection': 'close'}
headers['Context-Length'] = len(body)
path = 'http://localhost{0}'.format(constants.VIF_POOL_SHOW)
socket_path = constants.MANAGER_SOCKET_FILE
conn = UnixDomainHttpConnection(socket_path, timeout)
conn.request(method, path, body=body, headers=headers)
resp = conn.getresponse()
print(resp.read())