python类loads()的实例源码

k8s_client.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def watch(self, path):
        params = {'watch': 'true'}
        url = self._base_url + path
        header = {}
        if self.token:
            header.update({'Authorization': 'Bearer %s' % self.token})

        # TODO(ivc): handle connection errors and retry on failure
        while True:
            with contextlib.closing(
                    requests.get(url, params=params, stream=True,
                                 cert=self.cert, verify=self.verify_server,
                                 headers=header)) as response:
                if not response.ok:
                    raise exc.K8sClientException(response.text)
                for line in response.iter_lines(delimiter='\n'):
                    line = line.strip()
                    if line:
                        yield jsonutils.loads(line)
test_api.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_run_add(self, m_k8s_add):
        vif = fake._fake_vif()
        m_k8s_add.return_value = vif
        m_fin = StringIO()
        m_fout = StringIO()
        env = {
            'CNI_COMMAND': 'ADD',
            'CNI_ARGS': 'foo=bar',
        }
        self.runner.run(env, m_fin, m_fout)
        self.assertTrue(m_k8s_add.called)
        self.assertEqual('foo=bar', m_k8s_add.call_args[0][0].CNI_ARGS)
        result = jsonutils.loads(m_fout.getvalue())
        self.assertDictEqual(
            {"cniVersion": "0.3.0",
             "dns": {"nameservers": ["192.168.0.1"]},
             "ip4": {"gateway": "192.168.0.1", "ip": "192.168.0.2/24"}},
            result)
wsgi.py 文件源码 项目:vdi-broker 作者: cloudbase 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def action_peek_json(body):
    """Determine action to invoke."""

    try:
        decoded = jsonutils.loads(body)
    except ValueError:
        msg = _("cannot understand JSON")
        raise exception.MalformedRequestBody(reason=msg)

    # Make sure there's exactly one key...
    if len(decoded) != 1:
        msg = _("too many body keys")
        raise exception.MalformedRequestBody(reason=msg)

    # Return the action and the decoded body...
    return list(decoded.keys())[0]
keystone_rest_tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_user_session_get(self, kc):
        request = self.mock_rest_request()
        request.user = mock.Mock(
            services_region='some region',
            super_secret_thing='not here',
            is_authenticated=lambda: True,
            spec=['services_region', 'super_secret_thing']
        )
        response = keystone.UserSession().get(request)
        self.assertStatusCode(response, 200)
        content = jsonutils.loads(response.content)
        self.assertEqual(content['services_region'], 'some region')
        self.assertNotIn('super_secret_thing', content)

    #
    # Services
    #
test_volume.py 文件源码 项目:trio2o 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _test_and_check_delete(self, volumes, tenant_id):
        for test_vol in volumes:
            if test_vol.get('volume'):
                response = self.app.post_json(
                    '/v2/' + tenant_id + '/volumes',
                    dict(volume=test_vol['volume']),
                    expect_errors=True)
            self.assertEqual(response.status_int,
                             test_vol['expected_error'])
            if response.status_int == 202:
                json_body = jsonutils.loads(response.body)
                _id = json_body.get('volume')['id']
                query_resp = self.app.get(
                    '/v2/' + tenant_id + '/volumes/' + _id)
                self.assertEqual(query_resp.status_int, 200)

                delete_resp = self.app.delete(
                    '/v2/' + tenant_id + '/volumes/' + _id)
                self.assertEqual(delete_resp.status_int, 202)
wsgi.py 文件源码 项目:meteos 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def action_peek_json(body):
    """Determine action to invoke."""

    try:
        decoded = jsonutils.loads(body)
    except ValueError:
        msg = _("cannot understand JSON")
        raise exception.MalformedRequestBody(reason=msg)

    # Make sure there's exactly one key...
    if len(decoded) != 1:
        msg = _("too many body keys")
        raise exception.MalformedRequestBody(reason=msg)

    # Return the action and the decoded body...
    return list(decoded.keys())[0]
base.py 文件源码 项目:novajoin 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def action_peek_json(body):
    """Determine action to invoke."""

    try:
        decoded = jsonutils.loads(body)
    except ValueError:
        msg = "cannot understand JSON"
        raise exception.MalformedRequestBody(reason=msg)

    # Make sure there's exactly one key...
    if len(decoded) != 1:
        msg = "too many body keys"
        raise exception.MalformedRequestBody(reason=msg)

    # Return the action and the decoded body...
    return list(decoded.keys())[0]
exc.py 文件源码 项目:python-bileanclient 作者: openstack 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self, message=None, code=None):
        super(HTTPException, self).__init__(message)
        try:
            self.error = jsonutils.loads(message)
            if 'error' not in self.error:
                raise KeyError(_('Key "error" not exists'))
        except KeyError:
            # NOTE(jianingy): If key 'error' happens not exist,
            # self.message becomes no sense. In this case, we
            # return doc of current exception class instead.
            self.error = {'error':
                          {'message': self.__class__.__doc__}}
        except Exception:
            self.error = {'error':
                          {'message': self.message or self.__class__.__doc__}}
        if self.code == "N/A" and code is not None:
            self.code = code
api.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def update_image(self, image_uuid, values):
        if 'uuid' in values:
            msg = _('Cannot overwrite UUID for an existing image.')
            raise exception.InvalidParameterValue(err=msg)

        try:
            target = self.client.read('/images/' + image_uuid)
            target_value = json.loads(target.value)
            target_value.update(values)
            target.value = json.dump_as_bytes(target_value)
            self.client.update(target)
        except etcd.EtcdKeyNotFound:
            raise exception.ImageNotFound(image=image_uuid)
        except Exception as e:
            LOG.error('Error occurred while updating image: %s',
                      six.text_type(e))
            raise

        return translate_etcd_result(target, 'image')
api.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def update_compute_node(self, context, node_uuid, values):
        if 'uuid' in values:
            msg = _('Cannot overwrite UUID for an existing node.')
            raise exception.InvalidParameterValue(err=msg)

        try:
            target = self.client.read('/compute_nodes/' + node_uuid)
            target_value = json.loads(target.value)
            target_value.update(values)
            target.value = json.dumps(target_value)
            self.client.update(target)
        except etcd.EtcdKeyNotFound:
            raise exception.ComputeNodeNotFound(compute_node=node_uuid)
        except Exception as e:
            LOG.error(
                'Error occurred while updating compute node: %s',
                six.text_type(e))
            raise
        return translate_etcd_result(target, 'compute_node')
api.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def update_capsule(self, context, capsule_id, values):
        if 'uuid' in values:
            msg = _("Cannot overwrite UUID for an existing Capsule.")
            raise exception.InvalidParameterValue(err=msg)

        try:
            target_uuid = self.get_capsule_by_uuid(
                context, capsule_id).uuid
            target = self.client.read('/capsules/' + target_uuid)
            target_value = json.loads(target.value)
            target_value.update(values)
            target.value = json.dump_as_bytes(target_value)
            self.client.update(target)
        except etcd.EtcdKeyNotFound:
            raise exception.CapsuleNotFound(capsule=capsule_id)
        except Exception as e:
            LOG.error('Error occurred while updating capsule: %s',
                      six.text_type(e))
            raise

        return translate_etcd_result(target, 'capsule')
api.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def update_pci_device(self, node_id, address, values):
        try:
            pci_device = self.get_pci_device_by_addr(node_id, address)
            target = self.client.read('/pcidevices/' + pci_device.uuid)
            target_value = json.loads(target.value)
            target_value.update(values)
            target.value = json.dump_as_bytes(target_value)
            self.client.update(target)
        except exception.PciDeviceNotFound:
            values.update({'compute_node_uuid': node_id,
                           'address': address})
            return self._create_pci_device(values)
        except Exception as e:
            LOG.error('Error occurred while updating pci device: %s',
                      six.text_type(e))
            raise

        return translate_etcd_result(target, 'pcidevice')
api.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update_volume_mapping(self, context, volume_mapping_uuid, values):
        if 'uuid' in values:
            msg = _('Cannot overwrite UUID for an existing VolumeMapping.')
            raise exception.InvalidParameterValue(err=msg)

        try:
            target_uuid = self.get_volume_mapping_by_uuid(
                context, volume_mapping_uuid).uuid
            target = self.client.read('/volume_mapping/' + target_uuid)
            target_value = json.loads(target.value)
            target_value.update(values)
            target.value = json.dump_as_bytes(target_value)
            self.client.update(target)
        except etcd.EtcdKeyNotFound:
            raise exception.VolumeMappingNotFound(
                volume_mapping=volume_mapping_uuid)
        except Exception as e:
            LOG.error('Error occurred while updating volume mappping: %s',
                      six.text_type(e))
            raise

        return translate_etcd_result(target, 'volume_mapping')
cinder_workflow.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def detach_volume(self, volume):
        volume_id = volume.volume_id
        cinder_api = cinder.CinderAPI(self.context)

        try:
            cinder_api.begin_detaching(volume_id)
        except cinder_exception.BadRequest as e:
            raise exception.Invalid(_("Invalid volume: %s") % str(e))

        conn_info = jsonutils.loads(volume.connection_info)
        try:
            self._disconnect_volume(conn_info)
        except Exception:
            with excutils.save_and_reraise_exception():
                LOG.exception('Failed to disconnect volume %(volume_id)s',
                              {'volume_id': volume_id})
                cinder_api.roll_detaching(volume_id)

        cinder_api.terminate_connection(
            volume_id, get_volume_connector_properties())
        cinder_api.detach(volume_id)
pci_device_pool.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def from_pci_stats(pci_stats):
    """Create and return a PciDevicePoolList from the data stored in the db,

    which can be either the serialized object, or, prior to the creation of the
    device pool objects, a simple dict or a list of such dicts.
    """
    pools = []
    if isinstance(pci_stats, six.string_types):
        try:
            pci_stats = jsonutils.loads(pci_stats)
        except (ValueError, TypeError):
            pci_stats = None
    if pci_stats:
        # Check for object-ness, or old-style storage format.
        if 'zun_object.namespace' in pci_stats:
            return PciDevicePoolList.obj_from_primitive(pci_stats)
        else:
            # This can be either a dict or a list of dicts
            if isinstance(pci_stats, list):
                pools = [PciDevicePool.from_dict(stat)
                         for stat in pci_stats]
            else:
                pools = [PciDevicePool.from_dict(pci_stats)]
    return PciDevicePoolList(objects=pools)
manager.py 文件源码 项目:zun 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def update_devices_from_compute_resources(self, devices_json):
        """Sync the pci device tracker with compute node information.

        To support pci device hot plug, we sync with the compute node
        periodically, fetching all devices information from compute node,
        update the tracker and sync the DB information.

        Devices should not be hot-plugged when assigned to a container,
        but possibly the compute node has no such guarantee. The best
        we can do is to give a warning if a device is changed
        or removed while assigned.

        :param devices_json: The JSON-ified string of device information
                             that is returned from the compute node.
        """

        devices = []
        for dev in jsonutils.loads(devices_json):
            if self.dev_filter.device_assignable(dev):
                devices.append(dev)
        self._set_hvdevs(devices)
middleware.py 文件源码 项目:deckhand 作者: att-comdev 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def process_response(self, req, resp, resource):
        """Converts responses to ``application/x-yaml`` content type."""
        if resp.status != '204 No Content':
            resp.set_header('Content-Type', 'application/x-yaml')

        for attr in ('body', 'data'):
            if not hasattr(resp, attr):
                continue

            resp_attr = getattr(resp, attr)

            try:
                resp_attr = json.loads(resp_attr)
            except (TypeError, ValueError):
                pass

            if isinstance(resp_attr, dict):
                setattr(resp, attr, yaml.safe_dump(resp_attr))
            elif isinstance(resp_attr, (list, tuple)):
                setattr(resp, attr, yaml.safe_dump_all(resp_attr))
wsgi.py 文件源码 项目:coriolis 作者: cloudbase 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def action_peek_json(body):
    """Determine action to invoke."""

    try:
        decoded = jsonutils.loads(body)
    except ValueError:
        msg = _("cannot understand JSON")
        raise exception.MalformedRequestBody(reason=msg)

    # Make sure there's exactly one key...
    if len(decoded) != 1:
        msg = _("too many body keys")
        raise exception.MalformedRequestBody(reason=msg)

    # Return the action and the decoded body...
    return list(decoded.keys())[0]
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_volumedriver_create(self):
        self.volume_providers_setup(['cinder'])
        fake_request = {
            u'Name': u'test-vol',
            u'Opts': {u'size': u'1'},
        }
        for provider in app.volume_providers.values():
            provider.check_exist = mock.MagicMock()
            provider.check_exist.return_value = False
            provider.create = mock.MagicMock()

        response = self.app.post('/VolumeDriver.Create',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Err': u''
        }

        self.assertEqual(200, response.status_code)
        self.assertEqual(fake_response, jsonutils.loads(response.data))
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_volumedriver_create_invalid_volume_provider(self):
        self.volume_providers_setup(['cinder'])
        fake_request = {
            u'Name': u'test-vol',
            u'Opts': {u'size': u'1',
                      u'volume_provider': u'provider'}}
        for provider in app.volume_providers.values():
            provider.check_exist = mock.MagicMock()
            provider.check_exist.return_value = False
            provider.create = mock.MagicMock()

        response = self.app.post('VolumeDriver.Create',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Err': u''
        }
        self.assertEqual(200, response.status_code)
        self.assertNotEqual(fake_response, jsonutils.loads(response.data))
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_volumedriver_remove_with_volume_not_exist(self):
        self.volume_providers_setup(['cinder'])
        fake_request = {
            u'Name': u'test-vol',
        }
        for provider in app.volume_providers.values():
            provider.delete = mock.MagicMock()
            provider.delete.return_value = False

        response = self.app.post('/VolumeDriver.Remove',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Err': u''
        }
        self.assertEqual(200, response.status_code)
        self.assertEqual(fake_response, jsonutils.loads(response.data))
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_volumedriver_mount(self):
        self.volume_providers_setup(['cinder'])
        fake_name = u'test-vol'
        fake_request = {
            u'Name': fake_name
        }

        for provider in app.volume_providers.values():
            provider.check_exist = mock.MagicMock()
            provider.check_exist.return_value = True
            provider.mount = mock.MagicMock()
            provider.mount.return_value = fake_mountpoint(fake_name)

        response = self.app.post('/VolumeDriver.Mount',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Mountpoint': fake_mountpoint(fake_name),
            u'Err': u''
        }
        self.assertEqual(fake_response, jsonutils.loads(response.data))
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_volumedriver_mount_with_volume_not_exist(self):
        self.volume_providers_setup(['cinder'])
        fake_name = u'test-vol'
        fake_request = {
            u'Name': fake_name,
        }
        for provider in app.volume_providers.values():
            provider.check_exit = mock.MagicMock()
            provider.check_exit.return_value = False
        response = self.app.post('/VolumeDriver.Mount',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Mountpoint': fake_mountpoint(fake_name),
            u'Err': u''
        }
        self.assertEqual(200, response.status_code)
        self.assertNotEqual(fake_response, jsonutils.loads(response.data))
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_volumedriver_path(self):
        self.volume_providers_setup(['cinder'])
        fake_name = u'test-vol'
        fake_request = {
            u'Name': fake_name
        }
        for provider in app.volume_providers.values():
            provider.show = mock.MagicMock()
            provider.show.return_value = fake_volume(fake_name)

        response = self.app.post('/VolumeDriver.Path',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Mountpoint': fake_mountpoint(fake_name),
            u'Err': u''
        }
        self.assertEqual(fake_response, jsonutils.loads(response.data))
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_volumedriver_path_with_volume_not_exist(self):
        self.volume_providers_setup(['cinder'])
        fake_docker_volume_name = u'test-vol'
        fake_request = {
            u'Name': fake_docker_volume_name
        }
        for provider in app.volume_providers.values():
            provider.show = mock.MagicMock(side_effect=exceptions.NotFound)

        response = self.app.post('/VolumeDriver.Path',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Err': u'Mountpoint Not Found'
        }
        self.assertEqual(200, response.status_code)
        self.assertEqual(fake_response, jsonutils.loads(response.data))
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 95 收藏 0 点赞 0 评论 0
def test_volumedriver_get(self):
        self.volume_providers_setup(['cinder'])
        fake_name = u'test-vol'
        fake_request = {
            u'Name': fake_name
        }
        for provider in app.volume_providers.values():
            provider.show = mock.MagicMock()
            provider.show.return_value = fake_volume(fake_name)

        response = self.app.post('/VolumeDriver.Get',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Volume': {u'Name': fake_name,
                        u'Mountpoint': fake_mountpoint(fake_name)},
            u'Err': u''
        }
        self.assertEqual(200, response.status_code)
        self.assertEqual(fake_response, jsonutils.loads(response.data))
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_volumedriver_get_with_volume_not_exist(self):
        self.volume_providers_setup(['cinder'])
        fake_docker_volume_name = u'test-vol'
        fake_request = {
            u'Name': fake_docker_volume_name
        }
        for provider in app.volume_providers.values():
            provider.show = mock.MagicMock(side_effect=exceptions.NotFound())

        response = self.app.post('/VolumeDriver.Get',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Err': u'Volume Not Found'
        }
        self.assertEqual(200, response.status_code)
        self.assertEqual(fake_response, jsonutils.loads(response.data))
test_kuryr_ipam.py 文件源码 项目:kuryr-libnetwork 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_ipam_driver_request_pool_with_default_v6pool(self,
            mock_list_subnetpools):
        fake_kuryr_subnetpool_id = uuidutils.generate_uuid()
        fake_name = 'kuryr6'
        kuryr_subnetpools = self._get_fake_v6_subnetpools(
            fake_kuryr_subnetpool_id, prefixes=['fe80::/64'])
        mock_list_subnetpools.return_value = {
            'subnetpools': kuryr_subnetpools['subnetpools']}

        fake_request = {
            'AddressSpace': '',
            'Pool': '',
            'SubPool': '',  # In the case --ip-range is not given
            'Options': {},
            'V6': True
        }
        response = self.app.post('/IpamDriver.RequestPool',
                                content_type='application/json',
                                data=jsonutils.dumps(fake_request))

        self.assertEqual(200, response.status_code)
        mock_list_subnetpools.assert_called_with(name=fake_name)
        decoded_json = jsonutils.loads(response.data)
        self.assertEqual(fake_kuryr_subnetpool_id, decoded_json['PoolID'])
test_kuryr.py 文件源码 项目:kuryr-libnetwork 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_network_driver_endpoint_operational_info_with_no_port(self):
        docker_network_id = lib_utils.get_hash()
        docker_endpoint_id = lib_utils.get_hash()
        fake_port_response = {"ports": []}

        with mock.patch.object(app.neutron, 'list_ports') as mock_list_ports:
            data = {
                'NetworkID': docker_network_id,
                'EndpointID': docker_endpoint_id,
            }

            mock_list_ports.return_value = fake_port_response
            response = self.app.post('/NetworkDriver.EndpointOperInfo',
                                     content_type='application/json',
                                     data=jsonutils.dumps(data))
            decoded_json = jsonutils.loads(response.data)
            self.assertEqual(200, response.status_code)

            port_name = utils.get_neutron_port_name(docker_endpoint_id)
            mock_list_ports.assert_called_once_with(name=port_name)

            self.assertEqual({}, decoded_json['Value'])
test_kuryr.py 文件源码 项目:kuryr-libnetwork 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_network_driver_allocate_network(self):
        docker_network_id = lib_utils.get_hash()
        allocate_network_request = {
            'NetworkID': docker_network_id,
            'IPv4Data': [{
                'AddressSpace': 'foo',
                'Pool': '192.168.42.0/24',
                'Gateway': '192.168.42.1/24',
            }],
            'IPv6Data': [],
            'Options': {}
        }

        response = self.app.post('/NetworkDriver.AllocateNetwork',
                                 content_type='application/json',
                                 data=jsonutils.dumps(
                                     allocate_network_request))
        self.assertEqual(200, response.status_code)
        decoded_json = jsonutils.loads(response.data)
        self.assertEqual({'Options': {}}, decoded_json)


问题


面经


文章

微信
公众号

扫码关注公众号