python类Response()的实例源码

test_wsgi.py 文件源码 项目:masakari 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_resource_headers_are_utf8(self):
        resp = webob.Response(status_int=http.ACCEPTED)
        resp.headers['x-header1'] = 1
        resp.headers['x-header2'] = u'header2'
        resp.headers['x-header3'] = u'header3'

        class Controller(object):
            def index(self, req):
                return resp

        req = webob.Request.blank('/tests')
        app = fakes.TestRouter(Controller())
        response = req.get_response(app)
        for val in response.headers.values():
            # All headers must be utf8
            self.assertThat(val, matchers.EncodedByUTF8())
        self.assertEqual(b'1', response.headers['x-header1'])
        self.assertEqual(b'header2', response.headers['x-header2'])
        self.assertEqual(b'header3', response.headers['x-header3'])
wsgi.py 文件源码 项目:masakari 作者: openstack 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def serialize(self, request, content_type):
        """Serializes the wrapped object.

        Utility method for serializing the wrapped object.  Returns a
        webob.Response object.
        """

        serializer = self.serializer

        body = None
        if self.obj is not None:
            body = serializer.serialize(self.obj)
        response = webob.Response(body=body)
        if response.headers.get('Content-Length'):
            # NOTE: we need to encode 'Content-Length' header,
            # since webob.Response auto sets it if "body" attr is presented.
            # https://github.com/Pylons/webob/blob/1.5.0b0/webob/response.py#L147
            response.headers['Content-Length'] = utils.utf8(
                response.headers['Content-Length'])
        response.status_int = self.code
        for hdr, value in self._headers.items():
            response.headers[hdr] = utils.utf8(value)
        response.headers['Content-Type'] = utils.utf8(content_type)
        return response
shortest_rest_switch.py 文件源码 项目:sdn 作者: ray6 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def put_vtable(self, req, **kwargs):
        shortest_switch = self.shortest_switch_spp

        #Check request isn't blank.
        try:
            request_body_size = int(req.environ.get('CONTENT_LENGTH', 0))

        except(ValueError):
            request_body_size = 0

        d = parse_qs(req.body)

        #Interpret vlan input
        for key, value in d.items():
            host = ('00:00:00:00:00:0'+key[4])
            vlan = value[0][4]
            shortest_switch.set_vtable(host, vlan)

        body = self.html()

        return Response(content_type='text/html', body=body)
simple_rest_switch.py 文件源码 项目:sdn 作者: ray6 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def put_vtable(self, req, **kwargs):
        simple_switch = self.simple_switch_spp

        #Check request is not Blank.
        try:
            request_body_size = int(req.environ.get('CONTENT_LENGTH',0))
        except(ValueError):
            request_body_size = 0

        d = parse_qs(req.body)

        #Interpret the vlan list
        for key, value in d.items():
            host = ('00:00:00:00:00:0'+key[4])
            vlan = value[0][4]
            simple_switch.set_vtable(host, vlan)

        body = self.html()

        return Response(content_type='text/html', body=body)
request.py 文件源码 项目:bdocker 作者: indigo-dc 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def execute_get(self, path, parameters):
        """Execute GET request.

        This method execute a POST request on the endpoint.

        :param path: path of the request
        :param parameters: parameters to include in the request
        :return: json response
        """
        try:
            query_string = get_query_string(parameters)
            req = self._get_req(path, query_string=query_string,
                                method="GET")
            response = req.get_response()
        except Exception as e:
            response = webob.Response(status=500, body=str(e))
        json_response = self._get_from_response(response)
        return json_response
request.py 文件源码 项目:bdocker 作者: indigo-dc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def execute_post(self, path, parameters):
        """Execute POST request.

        This method execute a POST request on the endpoint.

        :param path: path of the request
        :param parameters: parameters to include in the request
        :return: json response
        """
        try:
            body = make_body(parameters)
            req = self._get_req(path, content_type="application/json",
                                body=body, method="POST")
            response = req.get_response()
        except Exception as e:
            response = webob.Response(status=500, body=str(e))
        json_response = self._get_from_response(response)
        return json_response
request.py 文件源码 项目:bdocker 作者: indigo-dc 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def execute_delete(self, path, parameters):
        """Execute DELETE request.

        This method execute a DELETE request on the endpoint.

        :param path: path of the request
        :param parameters: parameters to include in the request
        :return: json response
        """
        try:
            query_string = get_query_string(parameters)
            req = self._get_req(path, method="DELETE",
                                query_string=query_string)
            response = req.get_response(None)
        except Exception as e:
            response = webob.Response(status=500, body=str(e))
        json_response = self._get_from_response(response)
        return json_response
request.py 文件源码 项目:bdocker 作者: indigo-dc 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def execute_put(self, path, parameters):
        """Execute PUT request.

        This method execute a PUT request on the endpoint.

        :param path: path of the request
        :param parameters: parameters to include in the request
        :return: json response
        """
        try:
            body = make_body(parameters)
            req = self._get_req(path, content_type="application/json",
                                body=body, method="PUT")
            response = req.get_response()
        except Exception as e:
            response = webob.Response(status=500, body=str(e))
        json_response = self._get_from_response(response)
        return json_response
test_auth.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def setUp(self):
        super(TestKeystoneMiddlewareRoles, self).setUp()

        @webob.dec.wsgify()
        def role_check_app(req):
            context = req.environ['nova.context']

            if "knight" in context.roles and "bad" not in context.roles:
                return webob.Response(status="200 Role Match")
            elif context.roles == ['']:
                return webob.Response(status="200 No Roles")
            else:
                raise webob.exc.HTTPBadRequest("unexpected role header")

        self.middleware = nova.api.auth.NovaKeystoneContext(role_check_app)
        self.request = webob.Request.blank('/')
        self.request.headers['X_USER'] = 'testuser'
        self.request.headers['X_TENANT_ID'] = 'testtenantid'
        self.request.headers['X_AUTH_TOKEN'] = 'testauthtoken'
        self.request.headers['X_SERVICE_CATALOG'] = jsonutils.dumps({})

        self.roles = "pawn, knight, rook"
test_wsgi.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_resource_headers_are_utf8(self):
        resp = webob.Response(status_int=202)
        resp.headers['x-header1'] = 1
        resp.headers['x-header2'] = u'header2'
        resp.headers['x-header3'] = u'header3'

        class Controller(object):
            def index(self, req):
                return resp

        req = webob.Request.blank('/tests')
        app = fakes.TestRouter(Controller())
        response = req.get_response(app)
        for val in six.itervalues(response.headers):
            # All headers must be utf8
            self.assertThat(val, matchers.EncodedByUTF8())
        self.assertEqual(b'1', response.headers['x-header1'])
        self.assertEqual(b'header2', response.headers['x-header2'])
        self.assertEqual(b'header3', response.headers['x-header3'])
wsgi.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def serialize(self, request, content_type):
        """Serializes the wrapped object.

        Utility method for serializing the wrapped object.  Returns a
        webob.Response object.
        """

        serializer = self.serializer

        body = None
        if self.obj is not None:
            body = serializer.serialize(self.obj)
        response = webob.Response(body=body)
        if response.headers.get('Content-Length'):
            # NOTE(andreykurilin): we need to encode 'Content-Length' header,
            # since webob.Response auto sets it if "body" attr is presented.
            # https://github.com/Pylons/webob/blob/1.5.0b0/webob/response.py#L147
            response.headers['Content-Length'] = utils.utf8(
                response.headers['Content-Length'])
        response.status_int = self.code
        for hdr, value in self._headers.items():
            response.headers[hdr] = utils.utf8(value)
        response.headers['Content-Type'] = utils.utf8(content_type)
        return response
floating_ip_dns.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def delete(self, req, domain_id, id):
        """Delete the entry identified by req and id."""
        context = req.environ['nova.context']
        authorize(context)
        domain = _unquote_domain(domain_id)
        name = id

        try:
            self.network_api.delete_dns_entry(context, name, domain)
        except exception.NotFound as e:
            raise webob.exc.HTTPNotFound(explanation=e.format_message())
        except NotImplementedError:
            msg = _("Unable to delete dns entry")
            raise webob.exc.HTTPNotImplemented(explanation=msg)

        return webob.Response(status_int=202)
networks_associate.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _disassociate_host_only(self, req, id, body):
        context = req.environ['nova.context']
        authorize(context)
        # NOTE(shaohe-feng): back-compatible with db layer hard-code
        # admin permission checks.  call db API objects.Network.associate
        nova_context.require_admin_context(context)

        try:
            self.network_api.associate(context, id, host=None)
        except exception.NetworkNotFound:
            msg = _("Network not found")
            raise exc.HTTPNotFound(explanation=msg)
        except NotImplementedError:
            msg = _('Disassociate host is not implemented by the configured '
                    'Network API')
            raise exc.HTTPNotImplemented(explanation=msg)
        return webob.Response(status_int=202)
networks_associate.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _disassociate_project_only(self, req, id, body):
        context = req.environ['nova.context']
        authorize(context)
        # NOTE(shaohe-feng): back-compatible with db layer hard-code
        # admin permission checks.  call db API objects.Network.associate
        nova_context.require_admin_context(context)

        try:
            self.network_api.associate(context, id, project=None)
        except exception.NetworkNotFound:
            msg = _("Network not found")
            raise exc.HTTPNotFound(explanation=msg)
        except NotImplementedError:
            msg = _('Disassociate project is not implemented by the '
                    'configured Network API')
            raise exc.HTTPNotImplemented(explanation=msg)

        return webob.Response(status_int=202)
floating_ips.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def delete(self, req, id):
        context = req.environ['nova.context']
        authorize(context)

        # get the floating ip object
        try:
            floating_ip = self.network_api.get_floating_ip(context, id)
        except (exception.NotFound, exception.InvalidID):
            msg = _("Floating IP not found for ID %s") % id
            raise webob.exc.HTTPNotFound(explanation=msg)
        address = floating_ip['address']

        # get the associated instance object (if any)
        instance = get_instance_by_floating_ip_addr(self, context, address)
        try:
            self.network_api.disassociate_and_release_floating_ip(
                context, instance, floating_ip)
        except exception.Forbidden:
            raise webob.exc.HTTPForbidden()
        except exception.CannotDisassociateAutoAssignedFloatingIP:
            msg = _('Cannot disassociate auto assigned floating IP')
            raise webob.exc.HTTPForbidden(explanation=msg)
        return webob.Response(status_int=202)
client.py 文件源码 项目:stalker_pyramid 作者: eoyilmaz 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_client_users_out_stack(request):

    logger.debug('get_client_users_out_stack is running')

    client_id = request.matchdict.get('id', -1)
    client = Client.query.filter_by(id=client_id).first()
    if not client:
        transaction.abort()
        return Response('Can not find a client with id: %s' % client_id, 500)

    sql_query = """
            select
                "User_SimpleEntities".name,
                "User_SimpleEntities".id
            from "Users"
            left outer join "Client_Users" on "Client_Users".uid = "Users".id
            join "SimpleEntities" as "User_SimpleEntities" on "User_SimpleEntities".id = "Users".id

            where "Client_Users".cid != %(client_id)s or "Client_Users".cid is Null
    """

    sql_query = sql_query % {'client_id': client_id}
    result = DBSession.connection().execute(sql_query)

    users = []
    for r in result.fetchall():
        user = {
            'name': r[0],
            'id': r[1]
        }
        users.append(user)

    resp = Response(
        json_body=users
    )

    return resp
daily.py 文件源码 项目:stalker_pyramid 作者: eoyilmaz 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def update_daily(request):
    """runs when updating a daily
    """

    logged_in_user = get_logged_in_user(request)
    utc_now = local_to_utc(datetime.datetime.now())

    daily_id = request.matchdict.get('id', -1)
    daily = Daily.query.filter(Daily.id == daily_id).first()

    if not daily:
        transaction.abort()
        return Response('No daily with id : %s' % daily_id, 500)

    name = request.params.get('name')
    description = request.params.get('description')

    status_id = request.params.get('status_id')
    status = Status.query.filter(Status.id == status_id).first()

    if not name:
        return Response('Please supply a name', 500)

    if not description:
        return Response('Please supply a description', 500)

    if not status:
        return Response('There is no status with code: %s' % status.code, 500)

    daily.name = name
    daily.description = description
    daily.status = status
    daily.date_updated = utc_now
    daily.updated_by = logged_in_user

    request.session.flash('success: Successfully updated daily')
    return Response('Successfully updated daily')
asset.py 文件源码 项目:stalker_pyramid 作者: eoyilmaz 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def get_assets_types(request):
    """returns the Asset Types
    """
    sql_query = """select
     "Assets_Types_SimpleEntities".id,
     "Assets_Types_SimpleEntities".name

     from "Assets"

     join "SimpleEntities" as "Assets_SimpleEntities" on "Assets_SimpleEntities".id = "Assets".id
     join "SimpleEntities" as "Assets_Types_SimpleEntities" on "Assets_Types_SimpleEntities".id = "Assets_SimpleEntities".type_id

     group by
        "Assets_Types_SimpleEntities".name,
        "Assets_Types_SimpleEntities".id
     order by "Assets_Types_SimpleEntities".name
     """

    result = DBSession.connection().execute(sql_query)

    return_data = [
        {
            'asset_type_id': r[0],
            'asset_type_name': r[1]

        }
        for r in result.fetchall()
    ]

    content_range = '%s-%s/%s'

    type_count = len(return_data)
    content_range = content_range % (0, type_count - 1, type_count)

    logger.debug('content_range : %s' % content_range)

    resp = Response(
        json_body=return_data
    )
    resp.content_range = content_range
    return resp
budget.py 文件源码 项目:stalker_pyramid 作者: eoyilmaz 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def update_budget(request):
    """runs when updating a budget
    """

    logged_in_user = get_logged_in_user(request)
    utc_now = local_to_utc(datetime.datetime.now())

    budget_id = request.matchdict.get('id', -1)
    budget = Budget.query.filter(Budget.id == budget_id).first()

    if not budget:
        transaction.abort()
        return Response('No budget with id : %s' % budget_id, 500)

    name = request.params.get('name')
    description = request.params.get('description')

    status_id = request.params.get('status_id')
    status = Status.query.filter(Status.id == status_id).first()

    if not name:
        return Response('Please supply a name', 500)

    if not description:
        return Response('Please supply a description', 500)

    if not status:
        return Response('There is no status with code: %s' % status.code, 500)

    budget.name = name
    budget.description = description
    budget.status = status
    budget.date_updated = utc_now
    budget.updated_by = logged_in_user

    request.session.flash('success: Successfully updated budget')
    return Response('Successfully updated budget')
budget.py 文件源码 项目:stalker_pyramid 作者: eoyilmaz 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def delete_budgetentry_action(budgetentry):

    logger.debug('delete_budgetentry_action %s' % budgetentry.name)
    budgetentry_name = budgetentry.name
    try:
        DBSession.delete(budgetentry)
        transaction.commit()
    except Exception as e:
        transaction.abort()
        c = StdErrToHTMLConverter(e)
        transaction.abort()
        # return Response(c.html(), 500)
    # return Response('Successfully deleted good with name %s' % budgetentry_name)


问题


面经


文章

微信
公众号

扫码关注公众号