def add(self):
params = None
try:
params = utils.CNIParameters(flask.request.get_json())
LOG.debug('Received addNetwork request. CNI Params: %s', params)
vif = self.plugin.add(params)
data = jsonutils.dumps(vif.obj_to_primitive())
except exceptions.ResourceNotReady as e:
LOG.error("Timed out waiting for requested pod to appear in "
"registry: %s.", e)
return '', httplib.GATEWAY_TIMEOUT, self.headers
except Exception:
LOG.exception('Error when processing addNetwork request. CNI '
'Params: %s', params)
return '', httplib.INTERNAL_SERVER_ERROR, self.headers
return data, httplib.ACCEPTED, self.headers
python类dumps()的实例源码
def test_do_POST_populate(self):
method = 'create'
path = "http://localhost/populatePool"
trunk_ips = [u"10.0.0.6"]
num_ports = 3
body = jsonutils.dumps({"trunks": trunk_ips,
"num_ports": num_ports})
headers = {'Content-Type': 'application/json', 'Connection': 'close'}
headers['Content-Length'] = len(body)
trigger_exception = False
expected_resp = ('Ports pool at {} was populated with 3 ports.'
.format(trunk_ips)).encode()
self._do_POST_helper(method, path, headers, body, expected_resp,
trigger_exception, trunk_ips, num_ports)
def test_do_POST_populate_exception(self):
method = 'create'
path = "http://localhost/populatePool"
trunk_ips = [u"10.0.0.6"]
num_ports = 3
body = jsonutils.dumps({"trunks": trunk_ips,
"num_ports": num_ports})
headers = {'Content-Type': 'application/json', 'Connection': 'close'}
headers['Content-Length'] = len(body)
trigger_exception = True
expected_resp = ('Error while populating pool {0} with {1} ports.'
.format(trunk_ips, num_ports)).encode()
self._do_POST_helper(method, path, headers, body, expected_resp,
trigger_exception, trunk_ips, num_ports)
def test_do_POST_populate_no_trunks(self):
method = 'create'
path = "http://localhost/populatePool"
trunk_ips = []
num_ports = 3
body = jsonutils.dumps({"trunks": trunk_ips,
"num_ports": num_ports})
headers = {'Content-Type': 'application/json', 'Connection': 'close'}
headers['Content-Length'] = len(body)
trigger_exception = False
expected_resp = ('Trunk port IP(s) missing.'
.format(trunk_ips, num_ports)).encode()
self._do_POST_helper(method, path, headers, body, expected_resp,
trigger_exception, trunk_ips, num_ports)
def test_do_GET_list(self):
method = 'list'
method_resp = ('["10.0.0.6", "9d2b45c4efaa478481c30340b49fd4d2", '
'["00efc78c-f11c-414a-bfcd-a82e16dc07d1", '
'"fd6b13dc-7230-4cbe-9237-36b4614bc6b5"]] '
'has 5 ports')
path = "http://localhost/listPools"
body = jsonutils.dumps({})
headers = {'Content-Type': 'application/json', 'Connection': 'close'}
headers['Content-Length'] = len(body)
trigger_exception = False
expected_resp = ('Pools:\n{0}'.format(method_resp)).encode()
self._do_GET_helper(method, method_resp, path, headers, body,
expected_resp, trigger_exception)
def test_do_GET_list_exception(self):
method = 'list'
method_resp = ('["10.0.0.6", "9d2b45c4efaa478481c30340b49fd4d2", '
'["00efc78c-f11c-414a-bfcd-a82e16dc07d1", '
'"fd6b13dc-7230-4cbe-9237-36b4614bc6b5"]] '
'has 5 ports')
path = "http://localhost/listPools"
body = jsonutils.dumps({})
headers = {'Content-Type': 'application/json', 'Connection': 'close'}
headers['Content-Length'] = len(body)
trigger_exception = True
expected_resp = ('Error listing the pools.').encode()
self._do_GET_helper(method, method_resp, path, headers, body,
expected_resp, trigger_exception)
def test_do_GET_show(self):
method = 'show'
method_resp = "251f748d-2a0d-4143-bce8-2e616f7a6a4a"
path = "http://localhost/showPool"
pool_key = [u"10.0.0.6", u"9d2b45c4efaa478481c30340b49fd4d2",
[u"00efc78c-f11c-414a-bfcd-a82e16dc07d1",
u"fd6b13dc-7230-4cbe-9237-36b4614bc6b5"]]
body = jsonutils.dumps({"pool_key": pool_key})
headers = {'Content-Type': 'application/json', 'Connection': 'close'}
headers['Content-Length'] = len(body)
trigger_exception = False
formated_key = (pool_key[0], pool_key[1], tuple(sorted(pool_key[2])))
expected_resp = ('Pool {0} ports are:\n{1}'
.format(formated_key, method_resp)).encode()
self._do_GET_helper(method, method_resp, path, headers, body,
expected_resp, trigger_exception, pool_key)
def test_do_GET_show_exception(self):
method = 'show'
method_resp = "251f748d-2a0d-4143-bce8-2e616f7a6a4a"
path = "http://localhost/showPool"
pool_key = [u"10.0.0.6", u"9d2b45c4efaa478481c30340b49fd4d2",
[u"00efc78c-f11c-414a-bfcd-a82e16dc07d1",
u"fd6b13dc-7230-4cbe-9237-36b4614bc6b5"]]
body = jsonutils.dumps({"pool_key": pool_key})
headers = {'Content-Type': 'application/json', 'Connection': 'close'}
headers['Content-Length'] = len(body)
trigger_exception = True
formated_key = (pool_key[0], pool_key[1], tuple(sorted(pool_key[2])))
expected_resp = ('Error showing pool: {0}.'
.format(formated_key)).encode()
self._do_GET_helper(method, method_resp, path, headers, body,
expected_resp, trigger_exception, pool_key)
def test_do_GET_show_empty(self):
method = 'show'
method_resp = "Empty pool"
path = "http://localhost/showPool"
pool_key = [u"10.0.0.6", u"9d2b45c4efaa478481c30340b49fd4d2",
[u"00efc78c-f11c-414a-bfcd-a82e16dc07d1",
u"fd6b13dc-7230-4cbe-9237-36b4614bc6b5"]]
body = jsonutils.dumps({"pool_key": pool_key})
headers = {'Content-Type': 'application/json', 'Connection': 'close'}
headers['Content-Length'] = len(body)
trigger_exception = False
formated_key = (pool_key[0], pool_key[1], tuple(sorted(pool_key[2])))
expected_resp = ('Pool {0} ports are:\n{1}'
.format(formated_key, method_resp)).encode()
self._do_GET_helper(method, method_resp, path, headers, body,
expected_resp, trigger_exception, pool_key)
def test_annotate(self, m_patch, m_count):
m_count.return_value = list(range(1, 5))
path = '/test'
annotations = {'a1': 'v1', 'a2': 'v2'}
resource_version = "123"
ret = {'metadata': {'annotations': annotations,
"resourceVersion": resource_version}}
data = jsonutils.dumps(ret, sort_keys=True)
m_resp = mock.MagicMock()
m_resp.ok = True
m_resp.json.return_value = ret
m_patch.return_value = m_resp
self.assertEqual(annotations, self.client.annotate(
path, annotations, resource_version=resource_version))
m_patch.assert_called_once_with(self.base_url + path,
data=data, headers=mock.ANY,
cert=(None, None), verify=False)
def test_watch(self, m_get):
path = '/test'
data = [{'obj': 'obj%s' % i} for i in range(3)]
lines = [jsonutils.dumps(i) for i in data]
m_resp = mock.MagicMock()
m_resp.ok = True
m_resp.iter_lines.return_value = lines
m_get.return_value = m_resp
cycles = 3
self.assertEqual(
data * cycles,
list(itertools.islice(self.client.watch(path),
len(data) * cycles)))
self.assertEqual(cycles, m_get.call_count)
self.assertEqual(cycles, m_resp.close.call_count)
m_get.assert_called_with(self.base_url + path, headers={}, stream=True,
params={'watch': 'true'}, cert=(None, None),
verify=False)
def _set_lbaas_spec(self, service, lbaas_spec):
# TODO(ivc): extract annotation interactions
if lbaas_spec is None:
LOG.debug("Removing LBaaSServiceSpec annotation: %r", lbaas_spec)
annotation = None
else:
lbaas_spec.obj_reset_changes(recursive=True)
LOG.debug("Setting LBaaSServiceSpec annotation: %r", lbaas_spec)
annotation = jsonutils.dumps(lbaas_spec.obj_to_primitive(),
sort_keys=True)
svc_link = service['metadata']['selfLink']
ep_link = self._get_endpoints_link(service)
k8s = clients.get_kubernetes_client()
try:
k8s.annotate(ep_link,
{k_const.K8S_ANNOTATION_LBAAS_SPEC: annotation})
except k_exc.K8sClientException:
# REVISIT(ivc): only raise ResourceNotReady for NotFound
raise k_exc.ResourceNotReady(ep_link)
k8s.annotate(svc_link,
{k_const.K8S_ANNOTATION_LBAAS_SPEC: annotation},
resource_version=service['metadata']['resourceVersion'])
def run(self):
""" Override the function run: run testcase and update database """
update_data = {'task_id': self.args.get('task_id'),
'status': 'IN PROGRESS'}
self.handler.insert(update_data)
LOGGER.info('Starting running test case')
try:
data = self.target(self.args)
except Exception as err: # pylint: disable=broad-except
LOGGER.exception('Task Failed')
update_data = {'status': 'FAIL', 'error': str(err)}
self.handler.update_attr(self.args.get('task_id'), update_data)
else:
LOGGER.info('Task Finished')
LOGGER.debug('Result: %s', data)
new_data = {'status': 'FINISHED',
'result': jsonutils.dumps(data.get('result', {}))}
self.handler.update_attr(self.args.get('task_id'), new_data)
def _get_file_contents(from_data, files):
if not isinstance(from_data, (dict, list)):
return
if isinstance(from_data, dict):
recurse_data = six.itervalues(from_data)
for key, value in six.iteritems(from_data):
if _ignore_if(key, value):
continue
if not value.startswith(('http://', 'https://')):
raise exceptions.GetFileError(value, 'get_file')
if value not in files:
file_content = heat_utils.read_url_content(value)
if template_utils.is_template(file_content):
template = get_template_files(template_url=value)[1]
file_content = jsonutils.dumps(template)
files[value] = file_content
else:
recurse_data = from_data
for value in recurse_data:
_get_file_contents(value, files)
def sendjson(self, method, urlpath, obj=None):
"""Send json to the OpenDaylight controller."""
headers = {'Content-Type': 'application/json'}
data = jsonutils.dumps(obj, indent=2) if obj else None
url = '/'.join([self.url, urlpath])
LOG.debug("Sending METHOD (%(method)s) URL (%(url)s) JSON (%(obj)s)" %
{'method': method, 'url': url, 'obj': obj})
r = requests.request(method, url=url,
headers=headers, data=data,
auth=self.auth, timeout=self.timeout)
try:
r.raise_for_status()
except Exception as ex:
LOG.error("Error Sending METHOD (%(method)s) URL (%(url)s)"
"JSON (%(obj)s) return: %(r)s ex: %(ex)s rtext: "
"%(rtext)s" %
{'method': method, 'url': url, 'obj': obj, 'r': r,
'ex': ex, 'rtext': r.text})
return r
try:
return json.loads(r.content)
except Exception:
LOG.debug("%s" % r)
return
def format_nested_dict(d, fields, column_names):
if d is None:
return ''
pt = prettytable.PrettyTable(caching=False, print_empty=False,
header=True, field_names=column_names)
for n in column_names:
pt.align[n] = 'l'
keys = sorted(d.keys())
for field in keys:
value = d[field]
if not isinstance(value, six.string_types):
value = jsonutils.dumps(value, indent=2, ensure_ascii=False)
pt.add_row([field, value.strip('"')])
return pt.get_string()
def update_compute_node(self, context, node_uuid, values):
if 'uuid' in values:
msg = _('Cannot overwrite UUID for an existing node.')
raise exception.InvalidParameterValue(err=msg)
try:
target = self.client.read('/compute_nodes/' + node_uuid)
target_value = json.loads(target.value)
target_value.update(values)
target.value = json.dumps(target_value)
self.client.update(target)
except etcd.EtcdKeyNotFound:
raise exception.ComputeNodeNotFound(compute_node=node_uuid)
except Exception as e:
LOG.error(
'Error occurred while updating compute node: %s',
six.text_type(e))
raise
return translate_etcd_result(target, 'compute_node')
def test_detach_volume(self,
mock_cinder_api_cls,
mock_get_connector_prprts,
mock_get_volume_connector):
volume = mock.MagicMock()
volume.volume_id = self.fake_volume_id
volume.connection_info = jsonutils.dumps(self.fake_conn_info)
mock_cinder_api = mock.MagicMock()
mock_cinder_api_cls.return_value = mock_cinder_api
mock_connector = mock.MagicMock()
mock_get_connector_prprts.return_value = self.fake_conn_prprts
mock_get_volume_connector.return_value = mock_connector
cinder = cinder_workflow.CinderWorkflow(self.context)
cinder.detach_volume(volume)
mock_cinder_api.begin_detaching.assert_called_once_with(
self.fake_volume_id)
mock_connector.disconnect_volume.assert_called_once_with(
self.fake_conn_info['data'], None)
mock_cinder_api.terminate_connection.assert_called_once_with(
self.fake_volume_id, self.fake_conn_prprts)
mock_cinder_api.detach.assert_called_once_with(
self.fake_volume_id)
mock_cinder_api.roll_detaching.assert_not_called()
def get_pci_resources(self):
addresses = []
try:
output, status = processutils.execute('lspci', '-D', '-nnmm')
lines = output.split('\n')
for line in lines:
if not line:
continue
columns = line.split()
address = columns[0]
addresses.append(address)
except processutils.ProcessExecutionError:
raise exception.CommandError(cmd='lspci')
pci_info = []
for addr in addresses:
pci_info.append(self._get_pci_dev_info(addr))
return jsonutils.dumps(pci_info)
def serialize(self, entity):
if entity is None:
return None
key = self._get_serialization_key(type(entity))
# Primitive or not registered type.
if not key:
return jsonutils.dumps(
jsonutils.to_primitive(entity, convert_instances=True)
)
serializer = self.serializers.get(key)
if not serializer:
raise RuntimeError(
"Failed to find a serializer for the key: %s" % key
)
result = {
'__serial_key': key,
'__serial_data': serializer.serialize(entity)
}
return jsonutils.dumps(result)
def test_clear_port_bindings(self):
fake_port = copy.copy(test_constants.FAKE_PORT)
fake_port['address_bindings'] = ['a', 'b']
mocked_resource = self.get_mocked_resource()
def get_fake_port(*args):
return fake_port
mocked_resource.get = get_fake_port
mocked_resource.update(
fake_port['id'], fake_port['id'], address_bindings=[])
fake_port['address_bindings'] = []
test_client.assert_json_call(
'put', mocked_resource,
'https://1.2.3.4/api/v1/logical-ports/%s' % fake_port['id'],
data=jsonutils.dumps(fake_port, sort_keys=True),
headers=self.default_headers())
def test_create_logical_router(self):
"""Test creating a router returns the correct response and 201 status.
"""
fake_router = test_constants.FAKE_ROUTER.copy()
router = self.get_mocked_resource()
tier0_router = True
description = 'dummy'
router.create(fake_router['display_name'], None, None, tier0_router,
description=description)
data = {
'display_name': fake_router['display_name'],
'router_type': 'TIER0' if tier0_router else 'TIER1',
'tags': None,
'description': description
}
test_client.assert_json_call(
'post', router,
'https://1.2.3.4/api/v1/logical-routers',
data=jsonutils.dumps(data, sort_keys=True),
headers=self.default_headers())
def test_update_advertisement(self):
router = self.get_mocked_resource()
router_id = test_constants.FAKE_ROUTER_UUID
data = {'advertise_nat_routes': 'a',
'advertise_nsx_connected_routes': 'b',
'advertise_static_routes': False,
'enabled': True,
'advertise_lb_vip': False,
'advertise_lb_snat_ip': False}
with mock.patch("vmware_nsxlib.v3.NsxLib.get_version",
return_value='2.1.0'), \
mock.patch.object(router.client, 'get',
return_value={}):
router.update_advertisement(
router_id, **data)
test_client.assert_json_call(
'put', router,
('https://1.2.3.4/api/v1/logical-routers/%s/routing/'
'advertisement' % router_id),
data=jsonutils.dumps(data, sort_keys=True),
headers=self.default_headers())
def test_update_advertisement_no_lb(self):
router = self.get_mocked_resource()
router_id = test_constants.FAKE_ROUTER_UUID
data = {'advertise_nat_routes': 'a',
'advertise_nsx_connected_routes': 'b',
'advertise_static_routes': False,
'enabled': True}
with mock.patch("vmware_nsxlib.v3.NsxLib.get_version",
return_value='1.1.0'), \
mock.patch.object(router.client, 'get',
return_value={}):
# lb args will be ignored on this nsx version
router.update_advertisement(
router_id,
advertise_lb_vip=False,
advertise_lb_snat_ip=False,
**data)
test_client.assert_json_call(
'put', router,
('https://1.2.3.4/api/v1/logical-routers/%s/routing/'
'advertisement' % router_id),
data=jsonutils.dumps(data, sort_keys=True),
headers=self.default_headers())
def test_update_logical_router_port(self):
fake_router_port = test_constants.FAKE_ROUTER_PORT.copy()
uuid = fake_router_port['id']
fake_relay_uuid = uuidutils.generate_uuid()
lrport = self.get_mocked_resource()
with mock.patch.object(lrport, 'get', return_value=fake_router_port),\
mock.patch("vmware_nsxlib.v3.NsxLib.get_version",
return_value='2.0.0'):
lrport.update(uuid, relay_service_uuid=fake_relay_uuid)
data = {
'id': uuid,
'display_name': fake_router_port['display_name'],
'logical_router_id': fake_router_port['logical_router_id'],
'resource_type': fake_router_port['resource_type'],
"revision": 0,
'service_bindings': [{'service_id': {
'target_type': 'LogicalService',
'target_id': fake_relay_uuid}}]
}
test_client.assert_json_call(
'put', lrport,
'https://1.2.3.4/api/v1/logical-router-ports/%s' % uuid,
data=jsonutils.dumps(data, sort_keys=True),
headers=self.default_headers())
def test_create_ip_pool_no_ranges_with_gateway(self):
pool = self.get_mocked_resource()
cidr = '2.2.2.0/30'
gateway_ip = '2.2.2.1'
pool.create(cidr, allocation_ranges=None, gateway_ip=gateway_ip)
exp_ranges = [{'start': '2.2.2.0', 'end': '2.2.2.0'},
{'start': '2.2.2.2', 'end': '2.2.2.3'}]
data = {
'subnets': [{
'gateway_ip': gateway_ip,
'allocation_ranges': exp_ranges,
'cidr': cidr,
}]
}
test_client.assert_json_call(
'post', pool,
'https://1.2.3.4/api/v1/pools/ip-pools',
data=jsonutils.dumps(data, sort_keys=True),
headers=self.default_headers())
def test_create_ip_pool_no_ranges_no_gateway(self):
pool = self.get_mocked_resource()
cidr = '2.2.2.0/30'
pool.create(cidr, allocation_ranges=None)
exp_ranges = [{'start': '2.2.2.0', 'end': '2.2.2.3'}]
data = {
'subnets': [{
'allocation_ranges': exp_ranges,
'cidr': cidr,
}]
}
test_client.assert_json_call(
'post', pool,
'https://1.2.3.4/api/v1/pools/ip-pools',
data=jsonutils.dumps(data, sort_keys=True),
headers=self.default_headers())
def test_update_metadata_proxy(self):
fake_md = test_constants.FAKE_MD.copy()
md = self.get_mocked_resource()
new_url = "http://2.2.2.20:3500/xyz"
new_secret = 'abc'
new_edge = uuidutils.generate_uuid()
with mock.patch.object(md.client, 'url_get', return_value=fake_md):
md.update(fake_md['id'], server_url=new_url, secret=new_secret,
edge_cluster_id=new_edge)
fake_md.update({'metadata_server_url': new_url,
'secret': new_secret,
'edge_cluster_id': new_edge})
test_client.assert_json_call(
'put', md,
'https://1.2.3.4/api/v1/md-proxies/%s' % fake_md['id'],
data=jsonutils.dumps(fake_md, sort_keys=True),
headers=self.default_headers())
def _verify_backend_create(self, mocked_trust, cert_pem):
"""Verify API calls to create cert and identity on backend"""
# verify API call to import cert on backend
base_uri = 'https://1.2.3.4/api/v1/trust-management'
uri = base_uri + '/certificates?action=import'
expected_body = {'pem_encoded': cert_pem}
test_client.assert_json_call('post', mocked_trust.client, uri,
single_call=False,
data=jsonutils.dumps(expected_body))
# verify API call to bind cert to identity on backend
uri = base_uri + '/principal-identities'
expected_body = {'name': self.identity,
'node_id': self.node_id,
'permission_group': 'read_write_api_users',
'certificate_id': self.cert_id,
'is_protected': True}
test_client.assert_json_call('post', mocked_trust.client, uri,
single_call=False,
data=jsonutils.dumps(expected_body,
sort_keys=True))
def test_json_request(self):
resp = mocks.MockRequestsResponse(
200, jsonutils.dumps({'result': {'ok': 200}}))
api = self.new_mocked_client(client.JSONRESTClient,
session_response=resp,
url_prefix='api/v2/nat')
resp = api.create(body={'name': 'mgmt-egress'})
assert_json_call(
'post', api,
'https://1.2.3.4/api/v2/nat',
data=jsonutils.dumps({'name': 'mgmt-egress'}))
self.assertEqual(resp, {'result': {'ok': 200}})