python类dumps()的实例源码

test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_volumedriver_create(self):
        self.volume_providers_setup(['cinder'])
        fake_request = {
            u'Name': u'test-vol',
            u'Opts': {u'size': u'1'},
        }
        for provider in app.volume_providers.values():
            provider.check_exist = mock.MagicMock()
            provider.check_exist.return_value = False
            provider.create = mock.MagicMock()

        response = self.app.post('/VolumeDriver.Create',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Err': u''
        }

        self.assertEqual(200, response.status_code)
        self.assertEqual(fake_response, jsonutils.loads(response.data))
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_volumedriver_create_invalid_volume_provider(self):
        self.volume_providers_setup(['cinder'])
        fake_request = {
            u'Name': u'test-vol',
            u'Opts': {u'size': u'1',
                      u'volume_provider': u'provider'}}
        for provider in app.volume_providers.values():
            provider.check_exist = mock.MagicMock()
            provider.check_exist.return_value = False
            provider.create = mock.MagicMock()

        response = self.app.post('VolumeDriver.Create',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Err': u''
        }
        self.assertEqual(200, response.status_code)
        self.assertNotEqual(fake_response, jsonutils.loads(response.data))
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_volumedriver_remove(self):
        self.volume_providers_setup(['cinder'])
        fake_request = {
            u'Name': u'test-vol'
        }
        for provider in app.volume_providers.values():
            provider.delete = mock.MagicMock()
            provider.delete.return_value = True

        response = self.app.post('/VolumeDriver.Remove',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Err': u''
        }
        self.assertEqual(fake_response, jsonutils.loads(response.data))
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_volumedriver_mount(self):
        self.volume_providers_setup(['cinder'])
        fake_name = u'test-vol'
        fake_request = {
            u'Name': fake_name
        }

        for provider in app.volume_providers.values():
            provider.check_exist = mock.MagicMock()
            provider.check_exist.return_value = True
            provider.mount = mock.MagicMock()
            provider.mount.return_value = fake_mountpoint(fake_name)

        response = self.app.post('/VolumeDriver.Mount',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Mountpoint': fake_mountpoint(fake_name),
            u'Err': u''
        }
        self.assertEqual(fake_response, jsonutils.loads(response.data))
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_volumedriver_mount_with_volume_not_exist(self):
        self.volume_providers_setup(['cinder'])
        fake_name = u'test-vol'
        fake_request = {
            u'Name': fake_name,
        }
        for provider in app.volume_providers.values():
            provider.check_exit = mock.MagicMock()
            provider.check_exit.return_value = False
        response = self.app.post('/VolumeDriver.Mount',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Mountpoint': fake_mountpoint(fake_name),
            u'Err': u''
        }
        self.assertEqual(200, response.status_code)
        self.assertNotEqual(fake_response, jsonutils.loads(response.data))
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_volumedriver_path(self):
        self.volume_providers_setup(['cinder'])
        fake_name = u'test-vol'
        fake_request = {
            u'Name': fake_name
        }
        for provider in app.volume_providers.values():
            provider.show = mock.MagicMock()
            provider.show.return_value = fake_volume(fake_name)

        response = self.app.post('/VolumeDriver.Path',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Mountpoint': fake_mountpoint(fake_name),
            u'Err': u''
        }
        self.assertEqual(fake_response, jsonutils.loads(response.data))
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_volumedriver_path_with_volume_not_exist(self):
        self.volume_providers_setup(['cinder'])
        fake_docker_volume_name = u'test-vol'
        fake_request = {
            u'Name': fake_docker_volume_name
        }
        for provider in app.volume_providers.values():
            provider.show = mock.MagicMock(side_effect=exceptions.NotFound)

        response = self.app.post('/VolumeDriver.Path',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Err': u'Mountpoint Not Found'
        }
        self.assertEqual(200, response.status_code)
        self.assertEqual(fake_response, jsonutils.loads(response.data))
test_fuxi.py 文件源码 项目:fuxi 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_volumedriver_get_with_volume_not_exist(self):
        self.volume_providers_setup(['cinder'])
        fake_docker_volume_name = u'test-vol'
        fake_request = {
            u'Name': fake_docker_volume_name
        }
        for provider in app.volume_providers.values():
            provider.show = mock.MagicMock(side_effect=exceptions.NotFound())

        response = self.app.post('/VolumeDriver.Get',
                                 content_type='application/json',
                                 data=jsonutils.dumps(fake_request))
        fake_response = {
            u'Err': u'Volume Not Found'
        }
        self.assertEqual(200, response.status_code)
        self.assertEqual(fake_response, jsonutils.loads(response.data))
test_kuryr_ipam.py 文件源码 项目:kuryr-libnetwork 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_ipam_driver_request_pool_with_default_v6pool(self,
            mock_list_subnetpools):
        fake_kuryr_subnetpool_id = uuidutils.generate_uuid()
        fake_name = 'kuryr6'
        kuryr_subnetpools = self._get_fake_v6_subnetpools(
            fake_kuryr_subnetpool_id, prefixes=['fe80::/64'])
        mock_list_subnetpools.return_value = {
            'subnetpools': kuryr_subnetpools['subnetpools']}

        fake_request = {
            'AddressSpace': '',
            'Pool': '',
            'SubPool': '',  # In the case --ip-range is not given
            'Options': {},
            'V6': True
        }
        response = self.app.post('/IpamDriver.RequestPool',
                                content_type='application/json',
                                data=jsonutils.dumps(fake_request))

        self.assertEqual(200, response.status_code)
        mock_list_subnetpools.assert_called_with(name=fake_name)
        decoded_json = jsonutils.loads(response.data)
        self.assertEqual(fake_kuryr_subnetpool_id, decoded_json['PoolID'])
test_kuryr.py 文件源码 项目:kuryr-libnetwork 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_network_driver_endpoint_operational_info_with_no_port(self):
        docker_network_id = lib_utils.get_hash()
        docker_endpoint_id = lib_utils.get_hash()
        fake_port_response = {"ports": []}

        with mock.patch.object(app.neutron, 'list_ports') as mock_list_ports:
            data = {
                'NetworkID': docker_network_id,
                'EndpointID': docker_endpoint_id,
            }

            mock_list_ports.return_value = fake_port_response
            response = self.app.post('/NetworkDriver.EndpointOperInfo',
                                     content_type='application/json',
                                     data=jsonutils.dumps(data))
            decoded_json = jsonutils.loads(response.data)
            self.assertEqual(200, response.status_code)

            port_name = utils.get_neutron_port_name(docker_endpoint_id)
            mock_list_ports.assert_called_once_with(name=port_name)

            self.assertEqual({}, decoded_json['Value'])
test_kuryr.py 文件源码 项目:kuryr-libnetwork 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_network_driver_allocate_network(self):
        docker_network_id = lib_utils.get_hash()
        allocate_network_request = {
            'NetworkID': docker_network_id,
            'IPv4Data': [{
                'AddressSpace': 'foo',
                'Pool': '192.168.42.0/24',
                'Gateway': '192.168.42.1/24',
            }],
            'IPv6Data': [],
            'Options': {}
        }

        response = self.app.post('/NetworkDriver.AllocateNetwork',
                                 content_type='application/json',
                                 data=jsonutils.dumps(
                                     allocate_network_request))
        self.assertEqual(200, response.status_code)
        decoded_json = jsonutils.loads(response.data)
        self.assertEqual({'Options': {}}, decoded_json)
test_handlers.py 文件源码 项目:fuel-nailgun-extension-cluster-upgrade 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_node_reassign_handler_with_roles(self, mcast):
        cluster = self.env.create(
            cluster_kwargs={'api': False},
            nodes_kwargs=[{'status': consts.NODE_STATUSES.ready,
                           'roles': ['controller']}])
        node = cluster.nodes[0]
        seed_cluster = self.env.create_cluster(api=False)

        # NOTE(akscram): reprovision=True means that the node will be
        #                re-provisioned during the reassigning. This is
        #                a default behavior.
        data = {'nodes_ids': [node.id],
                'reprovision': True,
                'roles': ['compute']}
        resp = self.app.post(
            reverse('NodeReassignHandler',
                    kwargs={'cluster_id': seed_cluster.id}),
            jsonutils.dumps(data),
            headers=self.default_headers)
        self.assertEqual(202, resp.status_code)
        self.assertEqual(node.roles, [])
        self.assertEqual(node.pending_roles, ['compute'])
        self.assertTrue(mcast.called)
test_handlers.py 文件源码 项目:fuel-nailgun-extension-cluster-upgrade 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_node_reassign_handler_without_reprovisioning(self, mcast):
        cluster = self.env.create(
            cluster_kwargs={'api': False},
            nodes_kwargs=[{'status': consts.NODE_STATUSES.ready,
                           'roles': ['controller']}])
        node = cluster.nodes[0]
        seed_cluster = self.env.create_cluster(api=False)

        data = {'nodes_ids': [node.id],
                'reprovision': False,
                'roles': ['compute']}
        resp = self.app.post(
            reverse('NodeReassignHandler',
                    kwargs={'cluster_id': seed_cluster.id}),
            jsonutils.dumps(data),
            headers=self.default_headers)
        self.assertEqual(200, resp.status_code)
        self.assertFalse(mcast.called)
        self.assertEqual(node.roles, ['compute'])
utils.py 文件源码 项目:ara 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def playbook_treeview(playbook):
    """
    Creates a fake filesystem with playbook files and uses generate_tree() to
    recurse and return a JSON structure suitable for bootstrap-treeview.
    """
    fs = fake_filesystem.FakeFilesystem()
    mock_os = fake_filesystem.FakeOsModule(fs)

    files = models.File.query.filter(models.File.playbook_id.in_([playbook]))

    paths = {}
    for file in files:
        fs.CreateFile(file.path)
        paths[file.path] = file.id

    return jsonutils.dumps(generate_tree('/', paths, mock_os),
                           sort_keys=True,
                           indent=2)
networkops.py 文件源码 项目:nova-fusioncompute 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _init_all_fc_dvs(self):
        """Send message to fc and get dvswitch info

        :return:
        """
        LOG.debug("loading dvs mapping ")
        dvs_map_temp = {}
        physnet_map_temp = {}
        data = self.get(self.site.dvswitchs_uri)
        if not data.get(constant.DVSWITCHS):
            raise fc_exc.DVSwitchNotFound()

        dvs = data.get(constant.DVSWITCHS)
        if dvs and len(dvs) > 0:
            for dvswitch in dvs:
                dvs_id = utils.get_id_from_urn(dvswitch.get('urn'))
                dvs_map_temp[dvswitch["name"]] = dvs_id
                self.update_physnet_map(dvs_id, physnet_map_temp)

        LOG.debug(
            "init all fc dvs dvs map is %s, physnet map is %s",
            jsonutils.dumps(dvs_map_temp),
            jsonutils.dumps(physnet_map_temp))
        self.dvs_mapping = dvs_map_temp
        self.physnet_mapping = physnet_map_temp
networkops.py 文件源码 项目:nova-fusioncompute 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_vsp(self, dvs_id, pg_urn, vif):
        """send message to fusion compute to create a vsp

        :param dvs_id:
        :param pg_urn:
        :param vif:
        :return:
        """
        vsp_path = self.get_path_by_site(constant.VSP_URI,
                                         dvs_id=dvs_id)
        port_id = vif['id']

        body = {
            'name': port_id,
            'portGroupUrn': pg_urn,
            'tags': [{'tagKey': constant.VSP_TAG_KEY, 'tagValue': port_id}]
        }

        ret = self.post(vsp_path, data=jsonutils.dumps(body))

        return ret
subports.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_subports(num_ports, trunk_ips, timeout=180):
    method = 'POST'
    body = jsonutils.dumps({"trunks": trunk_ips, "num_ports": num_ports})
    headers = {'Content-Type': 'application/json', 'Connection': 'close'}
    headers['Content-Length'] = len(body)
    path = 'http://localhost{0}'.format(constants.VIF_POOL_POPULATE)
    socket_path = constants.MANAGER_SOCKET_FILE
    conn = UnixDomainHttpConnection(socket_path, timeout)
    conn.request(method, path, body=body, headers=headers)
    resp = conn.getresponse()
    print(resp.read())
subports.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def delete_subports(trunk_ips, timeout=180):
    method = 'POST'
    body = jsonutils.dumps({"trunks": trunk_ips})
    headers = {'Content-Type': 'application/json', 'Connection': 'close'}
    headers['Content-Length'] = len(body)
    path = 'http://localhost{0}'.format(constants.VIF_POOL_FREE)
    socket_path = constants.MANAGER_SOCKET_FILE
    conn = UnixDomainHttpConnection(socket_path, timeout)
    conn.request(method, path, body=body, headers=headers)
    resp = conn.getresponse()
    print(resp.read())
subports.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def list_pools(timeout=180):
    method = 'GET'
    body = jsonutils.dumps({})
    headers = {'Context-Type': 'application/json', 'Connection': 'close'}
    headers['Context-Length'] = len(body)
    path = 'http://localhost{0}'.format(constants.VIF_POOL_LIST)
    socket_path = constants.MANAGER_SOCKET_FILE
    conn = UnixDomainHttpConnection(socket_path, timeout)
    conn.request(method, path, body=body, headers=headers)
    resp = conn.getresponse()
    print(resp.read())
subports.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def show_pool(trunk_ip, project_id, sg, timeout=180):
    method = 'GET'
    body = jsonutils.dumps({"pool_key": [trunk_ip, project_id, sg]})
    headers = {'Context-Type': 'application/json', 'Connection': 'close'}
    headers['Context-Length'] = len(body)
    path = 'http://localhost{0}'.format(constants.VIF_POOL_SHOW)
    socket_path = constants.MANAGER_SOCKET_FILE
    conn = UnixDomainHttpConnection(socket_path, timeout)
    conn.request(method, path, body=body, headers=headers)
    resp = conn.getresponse()
    print(resp.read())


问题


面经


文章

微信
公众号

扫码关注公众号