python类HttpRequest()的实例源码

tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_create_snapshot_post(self):
        volume = self.cinder_volumes.first()
        snapshot = self.cinder_volume_snapshots.first()

        cinder.volume_get(IsA(http.HttpRequest), volume.id) \
            .AndReturn(volume)
        cinder.volume_snapshot_create(IsA(http.HttpRequest),
                                      volume.id,
                                      snapshot.name,
                                      snapshot.description,
                                      force=False) \
            .AndReturn(snapshot)
        self.mox.ReplayAll()

        formData = {'method': 'CreateSnapshotForm',
                    'tenant_id': self.tenant.id,
                    'volume_id': volume.id,
                    'name': snapshot.name,
                    'description': snapshot.description}
        url = reverse('horizon:project:volumes:volumes:create_snapshot',
                      args=[volume.id])
        res = self.client.post(url, formData)
        self.assertRedirectsNoFollow(res, VOLUME_SNAPSHOTS_TAB_URL)
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_delete_volume_snapshot(self):
        vol_snapshots = self.cinder_volume_snapshots.list()
        volumes = self.cinder_volumes.list()
        snapshot = self.cinder_volume_snapshots.first()

        api.cinder.volume_backup_supported(IsA(http.HttpRequest)). \
            MultipleTimes().AndReturn(True)
        api.cinder.volume_snapshot_list_paged(
            IsA(http.HttpRequest), paginate=True, marker=None,
            sort_dir='desc').AndReturn([vol_snapshots, False, False])
        api.cinder.volume_list(IsA(http.HttpRequest)). \
            AndReturn(volumes)

        api.cinder.volume_snapshot_delete(IsA(http.HttpRequest), snapshot.id)
        self.mox.ReplayAll()

        formData = {'action':
                    'volume_snapshots__delete__%s' % snapshot.id}
        res = self.client.post(VOLUME_SNAPSHOTS_TAB_URL, formData)

        self.assertRedirectsNoFollow(res, VOLUME_SNAPSHOTS_TAB_URL)
        self.assertMessageCount(success=1)
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_volume_snapshot_detail_get(self):
        volume = self.cinder_volumes.first()
        snapshot = self.cinder_volume_snapshots.first()

        api.cinder.volume_get(IsA(http.HttpRequest), volume.id). \
            AndReturn(volume)
        api.cinder.volume_snapshot_get(IsA(http.HttpRequest), snapshot.id). \
            AndReturn(snapshot)

        self.mox.ReplayAll()

        url = reverse('horizon:project:volumes:snapshots:detail',
                      args=[snapshot.id])
        res = self.client.get(url)

        self.assertTemplateUsed(res, 'horizon/common/_detail.html')
        self.assertEqual(res.context['snapshot'].id, snapshot.id)
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_volume_snapshot_detail_with_volume_get_exception(self):
        # Test to verify redirect if get volume fails
        volume = self.cinder_volumes.first()
        snapshot = self.cinder_volume_snapshots.first()

        api.cinder.volume_get(IsA(http.HttpRequest), volume.id). \
            AndRaise(self.exceptions.cinder)
        api.cinder.volume_snapshot_get(IsA(http.HttpRequest), snapshot.id). \
            AndReturn(snapshot)

        self.mox.ReplayAll()

        url = reverse('horizon:project:volumes:snapshots:detail',
                      args=[snapshot.id])
        res = self.client.get(url)

        self.assertRedirectsNoFollow(res, INDEX_URL)
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_update_snapshot(self):
        snapshot = self.cinder_volume_snapshots.first()

        cinder.volume_snapshot_get(IsA(http.HttpRequest), snapshot.id) \
            .AndReturn(snapshot)
        cinder.volume_snapshot_update(IsA(http.HttpRequest),
                                      snapshot.id,
                                      snapshot.name,
                                      snapshot.description) \
            .AndReturn(snapshot)
        self.mox.ReplayAll()

        formData = {'method': 'UpdateSnapshotForm',
                    'name': snapshot.name,
                    'description': snapshot.description}
        url = reverse(('horizon:project:volumes:snapshots:update'),
                      args=[snapshot.id])
        res = self.client.post(url, formData)
        self.assertRedirectsNoFollow(res, INDEX_URL)
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_delete_cgroup(self):
        cgroups = self.cinder_consistencygroups.list()
        cgroup = self.cinder_consistencygroups.first()

        cinder.volume_cgroup_list_with_vol_type_names(IsA(http.HttpRequest)).\
            AndReturn(cgroups)
        cinder.volume_cgroup_delete(IsA(http.HttpRequest), cgroup.id,
                                    force=False)
        if django.VERSION < (1, 9):
            cinder.volume_cgroup_list_with_vol_type_names(
                IsA(http.HttpRequest)).AndReturn(cgroups)

        self.mox.ReplayAll()

        formData = {'action': 'volume_cgroups__deletecg__%s' % cgroup.id}
        res = self.client.post(VOLUME_CGROUPS_TAB_URL, formData, follow=True)
        self.assertIn("Scheduled deletion of Consistency Group: cg_1",
                      [m.message for m in res.context['messages']])
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_update_cgroup_add_vol(self):
        cgroup = self.cinder_consistencygroups.first()
        volume = self.cinder_volumes.first()
        formData = {'volume_types': '1',
                    'name': 'test CG',
                    'description': 'test desc'}

        cinder.volume_cgroup_get(IsA(
            http.HttpRequest), cgroup.id).\
            AndReturn(cgroup)
        cinder.volume_cgroup_update(
            IsA(http.HttpRequest),
            formData['name'],
            formData['description'],
            add_vols=volume)\
            .AndReturn(cgroup)
        self.mox.ReplayAll()

        url = reverse('horizon:project:volumes:cgroups:update',
                      args=[cgroup.id])
        res = self.client.post(url, formData)
        self.assertNoFormErrors(res)
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_update_cgroup_remove_vol(self):
        cgroup = self.cinder_consistencygroups.first()
        volume = self.cinder_volumes.first()
        formData = {'volume_types': '1',
                    'name': 'test CG',
                    'description': 'test desc'}

        cinder.volume_cgroup_get(IsA(
            http.HttpRequest), cgroup.id).\
            AndReturn(cgroup)
        cinder.volume_cgroup_update(
            IsA(http.HttpRequest),
            formData['name'],
            formData['description'],
            remove_vols=volume)\
            .AndReturn(cgroup)
        self.mox.ReplayAll()

        url = reverse('horizon:project:volumes:cgroups:update',
                      args=[cgroup.id])
        res = self.client.post(url, formData)
        self.assertNoFormErrors(res)
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_update_cgroup_name_and_description(self):
        cgroup = self.cinder_consistencygroups.first()
        formData = {'volume_types': '1',
                    'name': 'test CG-new',
                    'description': 'test desc-new'}

        cinder.volume_cgroup_get(IsA(
            http.HttpRequest), cgroup.id).\
            AndReturn(cgroup)
        cinder.volume_cgroup_update(
            IsA(http.HttpRequest),
            formData['name'],
            formData['description'])\
            .AndReturn(cgroup)
        self.mox.ReplayAll()

        url = reverse('horizon:project:volumes:cgroups:update',
                      args=[cgroup.id])
        res = self.client.post(url, formData)
        self.assertNoFormErrors(res)
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_create_backup_post(self):
        volume = self.volumes.first()
        backup = self.cinder_volume_backups.first()

        api.cinder.volume_backup_create(IsA(http.HttpRequest),
                                        volume.id,
                                        backup.container_name,
                                        backup.name,
                                        backup.description) \
            .AndReturn(backup)
        self.mox.ReplayAll()

        formData = {'method': 'CreateBackupForm',
                    'tenant_id': self.tenant.id,
                    'volume_id': volume.id,
                    'container_name': backup.container_name,
                    'name': backup.name,
                    'description': backup.description}
        url = reverse('horizon:project:volumes:volumes:create_backup',
                      args=[volume.id])
        res = self.client.post(url, formData)

        self.assertNoFormErrors(res)
        self.assertMessageCount(error=0, warning=0)
        self.assertRedirectsNoFollow(res, VOLUME_BACKUPS_TAB_URL)
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 52 收藏 0 点赞 0 评论 0
def test_delete_volume_backup(self):
        vol_backups = self.cinder_volume_backups.list()
        volumes = self.cinder_volumes.list()
        backup = self.cinder_volume_backups.first()

        api.cinder.volume_backup_supported(IsA(http.HttpRequest)). \
            MultipleTimes().AndReturn(True)
        api.cinder.volume_backup_list_paged(
            IsA(http.HttpRequest), marker=None, sort_dir='desc',
            paginate=True).AndReturn([vol_backups, False, False])
        api.cinder.volume_list(IsA(http.HttpRequest)). \
            AndReturn(volumes)
        api.cinder.volume_backup_delete(IsA(http.HttpRequest), backup.id)

        self.mox.ReplayAll()

        formData = {'action':
                    'volume_backups__delete__%s' % backup.id}
        res = self.client.post(INDEX_URL +
                               "?tab=volumes_and_snapshots__backups_tab",
                               formData)

        self.assertRedirectsNoFollow(res, INDEX_URL +
                                     "?tab=volumes_and_snapshots__backups_tab")
        self.assertMessageCount(success=1)
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_volume_backup_detail_get(self):
        backup = self.cinder_volume_backups.first()
        volume = self.cinder_volumes.get(id=backup.volume_id)

        api.cinder.volume_backup_get(IsA(http.HttpRequest), backup.id). \
            AndReturn(backup)
        api.cinder.volume_get(IsA(http.HttpRequest), backup.volume_id). \
            AndReturn(volume)
        self.mox.ReplayAll()

        url = reverse('horizon:project:volumes:backups:detail',
                      args=[backup.id])
        res = self.client.get(url)

        self.assertTemplateUsed(res, 'horizon/common/_detail.html')
        self.assertEqual(res.context['backup'].id, backup.id)
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_restore_backup(self):
        backup = self.cinder_volume_backups.first()
        volumes = self.cinder_volumes.list()

        api.cinder.volume_list(IsA(http.HttpRequest)). \
            AndReturn(volumes)
        api.cinder.volume_backup_restore(IsA(http.HttpRequest),
                                         backup.id,
                                         backup.volume_id). \
            AndReturn(backup)
        self.mox.ReplayAll()

        formData = {'method': 'RestoreBackupForm',
                    'backup_id': backup.id,
                    'backup_name': backup.name,
                    'volume_id': backup.volume_id}
        url = reverse('horizon:project:volumes:backups:restore',
                      args=[backup.id])
        url += '?%s' % urlencode({'backup_name': backup.name,
                                  'volume_id': backup.volume_id})
        res = self.client.post(url, formData)

        self.assertNoFormErrors(res)
        self.assertMessageCount(info=1)
        self.assertRedirectsNoFollow(res, INDEX_URL)
test.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _test_index_paginated(self, marker, sort_dir, volumes, url,
                              has_more, has_prev):
        backup_supported = True
        vol_snaps = self.cinder_volume_snapshots.list()

        api.cinder.volume_backup_supported(IsA(http.HttpRequest)).\
            MultipleTimes().AndReturn(backup_supported)
        api.cinder.volume_list_paged(IsA(http.HttpRequest), marker=marker,
                                     sort_dir=sort_dir, search_opts=None,
                                     paginate=True).\
            AndReturn([volumes, has_more, has_prev])
        api.cinder.volume_snapshot_list(
            IsA(http.HttpRequest), search_opts=None).AndReturn(vol_snaps)
        api.nova.server_list(IsA(http.HttpRequest), search_opts=None).\
            AndReturn([self.servers.list(), False])
        api.cinder.tenant_absolute_limits(IsA(http.HttpRequest)).MultipleTimes().\
            AndReturn(self.cinder_limits['absolute'])
        self.mox.ReplayAll()

        res = self.client.get(urlunquote(url))
        self.assertEqual(res.status_code, 200)
        self.assertTemplateUsed(res, 'project/volumes/index.html')

        self.mox.UnsetStubs()
        return res
test.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _test_snapshots_index_paginated(self, marker, sort_dir, snapshots, url,
                                        has_more, has_prev):
        backup_supported = True

        api.cinder.volume_backup_supported(IsA(http.HttpRequest)).\
            MultipleTimes().AndReturn(backup_supported)
        api.cinder.volume_snapshot_list_paged(
            IsA(http.HttpRequest), marker=marker, sort_dir=sort_dir,
            paginate=True).AndReturn([snapshots, has_more, has_prev])
        api.cinder.volume_list(IsA(http.HttpRequest)).AndReturn(
            self.cinder_volumes.list())
        self.mox.ReplayAll()

        res = self.client.get(urlunquote(url))
        self.assertEqual(res.status_code, 200)
        self.assertTemplateUsed(res, 'project/volumes/index.html')

        self.mox.UnsetStubs()
        return res
test.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def _test_backups_index_paginated(self, marker, sort_dir, backups, url,
                                      has_more, has_prev):
        backup_supported = True

        api.cinder.volume_backup_supported(IsA(http.HttpRequest)).\
            MultipleTimes().AndReturn(backup_supported)
        api.cinder.volume_backup_list_paged(
            IsA(http.HttpRequest), marker=marker, sort_dir=sort_dir,
            paginate=True).AndReturn([backups, has_more, has_prev])
        api.cinder.volume_list(IsA(http.HttpRequest)).AndReturn(
            self.cinder_volumes.list())
        self.mox.ReplayAll()

        res = self.client.get(urlunquote(url))
        self.assertEqual(res.status_code, 200)
        self.assertTemplateUsed(res, 'project/volumes/index.html')

        self.mox.UnsetStubs()
        return res
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_add_vpnservice_get(self):
        networks = [{'subnets': [self.subnets.first(), ]}, ]
        routers = self.routers.list()

        api.neutron.network_list_for_tenant(
            IsA(http.HttpRequest), self.tenant.id).AndReturn(networks)
        api.neutron.router_list(
            IsA(http.HttpRequest), tenant_id=self.tenant.id).AndReturn(routers)

        self.mox.ReplayAll()

        res = self.client.get(reverse(self.ADDVPNSERVICE_PATH))

        workflow = res.context['workflow']
        self.assertTemplateUsed(res, views.WorkflowView.template_name)
        self.assertEqual(workflow.name, workflows.AddVPNService.name)

        expected_objs = ['<AddVPNServiceStep: addvpnserviceaction>', ]
        self.assertQuerysetEqual(workflow.steps, expected_objs)
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_add_vpnservice_post(self):
        vpnservice = self.vpnservices.first()
        networks = [{'subnets': [self.subnets.first(), ]}, ]
        routers = self.routers.list()

        api.neutron.network_list_for_tenant(
            IsA(http.HttpRequest), self.tenant.id).AndReturn(networks)
        api.neutron.router_list(
            IsA(http.HttpRequest), tenant_id=self.tenant.id).AndReturn(routers)

        form_data = {'name': vpnservice['name'],
                     'description': vpnservice['description'],
                     'subnet_id': vpnservice['subnet_id'],
                     'router_id': vpnservice['router_id'],
                     'admin_state_up': vpnservice['admin_state_up']}

        api.vpn.vpnservice_create(
            IsA(http.HttpRequest), **form_data).AndReturn(vpnservice)

        self.mox.ReplayAll()

        res = self.client.post(reverse(self.ADDVPNSERVICE_PATH), form_data)

        self.assertNoFormErrors(res)
        self.assertRedirectsNoFollow(res, str(self.INDEX_URL))
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_add_vpnservice_post_error(self):
        vpnservice = self.vpnservices.first()
        networks = [{'subnets': [self.subnets.first(), ]}, ]
        routers = self.routers.list()

        api.neutron.network_list_for_tenant(
            IsA(http.HttpRequest), self.tenant.id).AndReturn(networks)
        api.neutron.router_list(
            IsA(http.HttpRequest), tenant_id=self.tenant.id).AndReturn(routers)

        self.mox.ReplayAll()

        form_data = {'name': vpnservice['name'],
                     'description': vpnservice['description'],
                     'subnet_id': '',
                     'router_id': '',
                     'admin_state_up': vpnservice['admin_state_up']}

        res = self.client.post(reverse(self.ADDVPNSERVICE_PATH), form_data)

        self.assertFormErrors(res, 2)
tests.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_add_ikepolicy_post(self):
        ikepolicy = self.ikepolicies.first()

        form_data = {'name': ikepolicy['name'],
                     'description': ikepolicy['description'],
                     'auth_algorithm': ikepolicy['auth_algorithm'],
                     'encryption_algorithm': ikepolicy[
                         'encryption_algorithm'],
                     'ike_version': ikepolicy['ike_version'],
                     'lifetime_units': ikepolicy['lifetime']['units'],
                     'lifetime_value': ikepolicy['lifetime']['value'],
                     'phase1_negotiation_mode': ikepolicy[
                         'phase1_negotiation_mode'],
                     'pfs': ikepolicy['pfs']}

        api.vpn.ikepolicy_create(
            IsA(http.HttpRequest), **form_data).AndReturn(ikepolicy)

        self.mox.ReplayAll()

        res = self.client.post(reverse(self.ADDIKEPOLICY_PATH), form_data)

        self.assertNoFormErrors(res)
        self.assertRedirectsNoFollow(res, str(self.INDEX_URL))


问题


面经


文章

微信
公众号

扫码关注公众号