def test_join_bad_request(self):
fake_docker_network_id = lib_utils.get_hash()
invalid_docker_endpoint_id = 'id-should-be-hexdigits'
fake_container_id = lib_utils.get_hash()
response = self._invoke_join_request(
fake_docker_network_id, invalid_docker_endpoint_id,
fake_container_id)
self.assertEqual(
w_exceptions.BadRequest.code, response.status_code)
decoded_json = jsonutils.loads(response.data)
self.assertIn('Err', decoded_json)
# TODO(tfukushima): Add the better error message validation.
self.assertIn(invalid_docker_endpoint_id, decoded_json['Err'])
self.assertIn('EndpointID', decoded_json['Err'])
python类loads()的实例源码
def handle_request(self, request):
if self._is_targeted(request.headers):
return
body = jsonutils.loads(request.body)
if request.service == 'image':
if request.version == 'v1':
name = request.headers.get('X-IMAGE-META-NAME', '')
else:
name = body.get('name', '')
elif request.service == 'volume':
name = body['volume'].get('name', '')
name = name.split('@')
if len(name) == 2:
request.headers['MM-SERVICE-PROVIDER'] = name[1]
def take_action(self, args):
try:
if args.playbook:
host = (models.Host.query
.filter_by(playbook_id=args.playbook)
.filter((models.Host.id == args.host) |
(models.Host.name == args.host)).one())
else:
host = models.Host.query.filter_by(id=args.host).one()
except (models.NoResultFound, models.MultipleResultsFound):
raise RuntimeError('Host %s could not be found' % args.host)
if not host.facts:
raise RuntimeError('No facts available for host %s' % args.host)
facts = ((k, v) for k, v in
six.iteritems(jsonutils.loads(host.facts.values))
if not args.fact or k in args.fact
)
return six.moves.zip(*sorted(facts))
def test_playbook_treeview(self):
ctx = ansible_run()
treeview = jsonutils.loads(u.playbook_treeview(ctx['playbook'].id))
# ansible_run provides two fake files:
# /some/path/main.yml and /playbook.yml
for f in treeview:
if f['text'] == 'some':
self.assertEqual(f['text'], 'some')
child = f['nodes'][0]
self.assertEqual(child['text'], 'path')
child = child['nodes'][0]
self.assertEqual(child['text'], 'main.yml')
self.assertEqual(child['dataAttr']['load'],
ctx['task_file'].id)
else:
self.assertEqual(f['text'], 'playbook.yml')
self.assertEqual(f['dataAttr']['load'], ctx['pb_file'].id)
def get_local_disk_property(self, instance):
if instance.system_metadata.get('local_disk_property'):
return jsonutils.loads(
instance.system_metadata.get('local_disk_property'))
result = {}
extra_specs = self.get_instance_extra_specs(instance)
if extra_specs:
local_disk_property = extra_specs.get('quota:local_disk')
if local_disk_property:
local_disk_property = local_disk_property.split(':')
result['type'] = local_disk_property[0]
result['count'] = int(local_disk_property[1])
result['size'] = int(local_disk_property[2])
result['safe_format'] = local_disk_property[3]
if len(result) == 4:
return result
def simple_parse(in_str):
try:
out_dict = jsonutils.loads(in_str)
except ValueError:
try:
out_dict = yaml.load(in_str, Loader=YamlLoader)
except yaml.YAMLError as yea:
yea = six.text_type(yea)
msg = _('Error parsing input: %s') % yea
raise ValueError(msg)
else:
if out_dict is None:
out_dict = {}
if not isinstance(out_dict, dict):
msg = _('The input is not a JSON object or YAML mapping.')
raise ValueError(msg)
return out_dict
def __init__(self, message=None):
super(HTTPException, self).__init__(message)
try:
self.error = jsonutils.loads(message)
# Adjutant client: mangle the 'errors' return list into
# standard 'error' format
if 'errors' in self.error:
self.error['error'] = {
"message": ', '.join(self.error['errors']),
}
if 'error' not in self.error:
raise KeyError(_('Key "error" does not exist.'))
except KeyError:
# NOTE(jianingy): If key 'error' happens not exist,
# self.message becomes no sense. In this case, we
# return doc of current exception class instead.
self.error = {'error':
{'message': self.__class__.__doc__}}
except Exception:
self.error = {'error':
{'message': self.message or self.__class__.__doc__}}
def test_detail_servers(self, mock_get_all):
# NOTE(danms): Orphan these fakes (no context) so that we
# are sure that the API is requesting what it needs without
# having to lazy-load.
meta = {"preemptible": True}
mock_get_all.return_value = objects.InstanceList(
objects=[os_api_fakes.stub_instance_obj(ctxt=None, id=1,
system_metadata=meta),
os_api_fakes.stub_instance_obj(ctxt=None, id=2,
system_metadata=meta)])
req = os_api_fakes.HTTPRequest.blank(self.base_url + 'servers/detail')
res = req.get_response(self.app)
server_dicts = jsonutils.loads(res.body)['servers']
self.assertNotEqual(len(server_dicts), 0)
for server_dict in server_dicts:
self.assertIn('preemptible', server_dict)
def json_request(self, method, conn_url, **kwargs):
kwargs.setdefault('headers', {})
kwargs['headers'].setdefault('Content-Type', 'application/json')
kwargs['headers'].setdefault('Accept', 'application/json')
if 'body' in kwargs:
kwargs['body'] = jsonutils.dump_as_bytes(kwargs['body'])
resp, body_iter = self._http_request(conn_url, method, **kwargs)
content_type = resp.headers.get('Content-Type')
if(resp.status_code in (http_client.NO_CONTENT,
http_client.RESET_CONTENT)
or content_type is None):
return resp, list()
if 'application/json' in content_type:
body = ''.join([chunk for chunk in body_iter])
try:
body = jsonutils.loads(body)
except ValueError:
LOG.error(_LE('Could not decode response body as JSON'))
else:
body = None
return resp, body
def create(self, code, runtime=None, package=None, **kwargs):
data = {
'runtime_id': runtime,
'code': jsonutils.dumps(code)
}
for k, v in kwargs.items():
if v is not None:
data.update({k: v})
params = {"data": data}
if package:
params.update({"files": {'package': package}})
response = self.http_client.request(
'/v1/functions',
'POST',
**params
)
body = jsonutils.loads(response.text)
return self.resource_class(self, body)
def update(self, id, code=None, package=None, **kwargs):
if code:
kwargs.update(code)
params = {"data": kwargs}
if package:
params.update({"files": {'package': package}})
response = self.http_client.request(
'/v1/functions/%s' % id,
'PUT',
**params
)
body = jsonutils.loads(response.text)
return self.resource_class(self, body)
def json_request(self, url, method, **kwargs):
headers = kwargs.setdefault('headers', {})
headers['Content-Type'] = kwargs.pop('content_type',
'application/json')
if 'data' in kwargs:
kwargs['data'] = jsonutils.dumps(kwargs['data'])
# NOTE(starodubcevna): We need to prove that json field is empty,
# or it will be modified by keystone adapter.
kwargs['json'] = None
resp = self.request(url, method, **kwargs)
body = resp.text
if body:
try:
body = jsonutils.loads(body)
except ValueError:
pass
return resp, body
def extract_resources(args):
resources = []
for data in args.resources.split(','):
if '=' in data and len(data.split('=')) in [3, 4]:
resource = dict(zip(['id', 'type', 'name', 'extra_info'],
data.split('=')))
if resource.get('extra_info'):
resource['extra_info'] = jsonutils.loads(
resource.get('extra_info'))
else:
raise exceptions.CommandError(
"Unable to parse parameter resources. "
"The keys of resource are id , type, name and "
"extra_info. The extra_info field is optional.")
resources.append(resource)
return resources
def json_request(self, method, url, **kwargs):
headers = kwargs.setdefault('headers', {})
headers['Content-Type'] = kwargs.pop('content_type',
'application/json')
if 'body' in kwargs:
if 'data' in kwargs:
raise ValueError("Can't provide both 'data' and "
"'body' to a request")
LOG.warning("Use of 'body' is deprecated; use 'data' instead")
kwargs['data'] = kwargs.pop('body')
if 'data' in kwargs:
kwargs['data'] = jsonutils.dumps(kwargs['data'])
# NOTE(starodubcevna): We need to prove that json field is empty,
# or it will be modified by keystone adapter.
kwargs['json'] = None
resp, body = self.request(url, method, **kwargs)
if body:
try:
body = jsonutils.loads(body)
except ValueError:
pass
return resp, body
def take_action(self, args):
try:
if args.playbook:
host = (models.Host.query
.filter_by(playbook_id=args.playbook)
.filter((models.Host.id == args.host) |
(models.Host.name == args.host)).one())
else:
host = models.Host.query.filter_by(id=args.host).one()
except (models.NoResultFound, models.MultipleResultsFound):
raise RuntimeError('Host %s could not be found' % args.host)
if not host.facts:
raise RuntimeError('No facts available for host %s' % args.host)
facts = ((k, v) for k, v in
six.iteritems(jsonutils.loads(host.facts.values))
if not args.fact or k in args.fact
)
return six.moves.zip(*sorted(facts))
def test_playbook_treeview(self):
ctx = ansible_run()
treeview = jsonutils.loads(u.playbook_treeview(ctx['playbook'].id))
# ansible_run provides two fake files:
# /some/path/main.yml and /playbook.yml
for f in treeview:
if f['text'] == 'some':
self.assertEqual(f['text'], 'some')
child = f['nodes'][0]
self.assertEqual(child['text'], 'path')
child = child['nodes'][0]
self.assertEqual(child['text'], 'main.yml')
self.assertEqual(child['dataAttr']['load'],
ctx['task_file'].id)
else:
self.assertEqual(f['text'], 'playbook.yml')
self.assertEqual(f['dataAttr']['load'], ctx['pb_file'].id)
def added(self, router_key, value):
token1, token2 = self.parse_key(router_key)
if token1 and token2:
if token1 != 'floatingip':
port_id = token2
router_data = jsonutils.loads(value)
self.data.vppf.ensure_router_interface_on_host(
port_id, router_data)
if router_data.get('net_type') == 'vxlan':
self.data.add_gpe_remote_mapping(
router_data['segmentation_id'],
router_data['loopback_mac'],
router_data['gateway_ip'])
else:
floating_ip = token2
floatingip_dict = jsonutils.loads(value)
self.data.vppf.associate_floatingip(floating_ip,
floatingip_dict)
def _get_vif(self, pod):
# TODO(ivc): same as VIFHandler._get_vif
try:
annotations = pod['metadata']['annotations']
vif_annotation = annotations[k_const.K8S_ANNOTATION_VIF]
except KeyError:
return None
vif_dict = jsonutils.loads(vif_annotation)
vif = obj_vif.vif.VIFBase.obj_from_primitive(vif_dict)
LOG.debug("Got VIF from annotation: %r", vif)
return vif
def test_run_write_version(self, *args):
m_fin = StringIO()
m_fout = StringIO()
code = self.runner.run(
{'CNI_COMMAND': 'VERSION', 'CNI_ARGS': 'foo=bar'}, m_fin, m_fout)
result = jsonutils.loads(m_fout.getvalue())
self.assertEqual(0, code)
self.assertEqual(api.CNIRunner.SUPPORTED_VERSIONS,
result['supportedVersions'])
self.assertEqual(api.CNIRunner.VERSION, result['cniVersion'])
def _get_in_use_ports(self):
kubernetes = clients.get_kubernetes_client()
in_use_ports = []
running_pods = kubernetes.get(constants.K8S_API_BASE + '/pods')
for pod in running_pods['items']:
try:
annotations = jsonutils.loads(pod['metadata']['annotations'][
constants.K8S_ANNOTATION_VIF])
except KeyError:
LOG.debug("Skipping pod without kuryr VIF annotation: %s",
pod)
else:
in_use_ports.append(
annotations['versioned_object.data']['id'])
return in_use_ports