python类Response()的实例源码

daily.py 文件源码 项目:stalker_pyramid 作者: eoyilmaz 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def append_link_to_daily(request):

    link_id = request.matchdict.get('id')
    link = Link.query.filter_by(id=link_id).first()

    daily_id = request.matchdict.get('did')
    daily = Daily.query.filter_by(id=daily_id).first()

    if not link:
        transaction.abort()
        return Response('There is no link with id: %s' % link_id, 500)

    if not daily:
        transaction.abort()
        return Response('There is no daily with id: %s' % daily_id, 500)

    if link not in daily.links:
        daily.links.append(link)

    request.session.flash(
        'success: Output is added to daily: %s '% daily.name
    )

    return Response('Output is added to daily: %s '% daily.name)
daily.py 文件源码 项目:stalker_pyramid 作者: eoyilmaz 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def remove_link_to_daily(request):

    link_id = request.matchdict.get('id')
    link = Link.query.filter_by(id=link_id).first()

    daily_id = request.matchdict.get('did')
    daily = Daily.query.filter_by(id=daily_id).first()

    if not link:
        transaction.abort()
        return Response('There is no link with id: %s' % link_id, 500)

    if not daily:
        transaction.abort()
        return Response('There is no daily with id: %s' % daily_id, 500)

    if link in daily.links:
        daily.links.remove(link)

    request.session.flash(
        'success: Output is removed from daily: %s '% daily.name
    )

    return Response('Output is removed to daily: %s '% daily.name)
budget.py 文件源码 项目:stalker_pyramid 作者: eoyilmaz 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def delete_budgetentry(request):
    """deletes the budgetentry
    """

    budgetentry_id = request.params.get('id')
    budgetentry = BudgetEntry.query.filter_by(id=budgetentry_id).first()

    if not budgetentry:
        transaction.abort()
        return Response('There is no budgetentry with id: %s' % budgetentry_id, 500)

    if budgetentry.type.name == 'Calendar':
        transaction.abort()
        return Response('You can not delete CalenderBasedEntry', 500)

    delete_budgetentry_action(budgetentry)
wsgi.py 文件源码 项目:meteos 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def serialize(self, request, content_type, default_serializers=None):
        """Serializes the wrapped object.

        Utility method for serializing the wrapped object.  Returns a
        webob.Response object.
        """

        if self.serializer:
            serializer = self.serializer
        else:
            _mtype, _serializer = self.get_serializer(content_type,
                                                      default_serializers)
            serializer = _serializer()

        response = webob.Response()
        response.status_int = self.code
        for hdr, value in self._headers.items():
            response.headers[hdr] = six.text_type(value)
        response.headers['Content-Type'] = six.text_type(content_type)
        if self.obj is not None:
            response.body = serializer.serialize(self.obj)

        return response
devserver.py 文件源码 项目:gateway_manager 作者: binarydud 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _call_apex(request):
    node = _lookup_table[request.matched_route.name]
    function_name = node.handler
    try:
        response = run_function(request, function_name)
    except Exception as e:
        partial = functools.partial(filter_on_response_pattern, e)
        response = filter(partial, node.responses)
        if response:
            response = response[0]
            body = build_error_response(response.code, e)

            return Response(body=body, status_code=response.code, content_type=response.body[0].mime_type)
        return e
    else:
        return Response(body=json.dumps(response), content_type='application/json')
error.py 文件源码 项目:baka 作者: baka-framework 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def http_error(context, request):
    with JSONAPIResponse(request.response) as resp:
        _in = u'Failed'
        code, status = JSONAPIResponse.BAD_REQUEST
        if isinstance(context, webob.Response) \
                and context.content_type == 'application/json':
            return context

        request.response.status = context.status
        status = context.status
        for (header, value) in context.headers.items():
            if header in {'Content-Type', 'Content-Length'}:
                continue
            request.response.headers[header] = value
        if context.message:
            message =  {'message': context.message}
        else:
            message = {'message': context.status}

    return resp.to_json(
        _in, code=code,
        status=status, message=message)
webapp2.py 文件源码 项目:webapp2 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _set_status(self, value):
        """The status string, including code and message."""
        message = None
        # Accept long because urlfetch in App Engine returns codes as longs.
        if isinstance(value, six.integer_types):
            code = int(value)
        else:
            if isinstance(value, six.text_type):
                # Status messages have to be ASCII safe, so this is OK.
                value = str(value)

            if not isinstance(value, str):
                raise TypeError(
                    'You must set status to a string or integer (not %s)'
                    % type(value))

            parts = value.split(' ', 1)
            code = int(parts[0])
            if len(parts) == 2:
                message = parts[1]

        message = message or Response.http_status_message(code)
        self._status = '%d %s' % (code, message)
webapp2.py 文件源码 项目:webapp2 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def redirect_to(_name, _permanent=False, _abort=False, _code=None,
                _body=None, _request=None, _response=None, *args, **kwargs):
    """Convenience function mixing :func:`redirect` and :func:`uri_for`.

    Issues an HTTP redirect to a named URI built using :func:`uri_for`.

    :param _name:
        The route name to redirect to.
    :param args:
        Positional arguments to build the URI.
    :param kwargs:
        Keyword arguments to build the URI.
    :returns:
        A :class:`Response` instance.

    The other arguments are described in :func:`redirect`.
    """
    uri = uri_for(_name, _request=_request, *args, **kwargs)
    return redirect(uri, permanent=_permanent, abort=_abort, code=_code,
                    body=_body, request=_request, response=_response)
rest_utils.py 文件源码 项目:qinling 作者: openstack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def wrap_pecan_controller_exception(func):
    """Decorator for controllers method.

    This decorator wraps controllers method to manage pecan exceptions:
    In case of expected error it aborts the request with specific status code.
    """

    @functools.wraps(func)
    def wrapped(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except exc.QinlingException as e:
            LOG.error('Error during API call: %s', six.text_type(e))
            return webob.Response(
                status=e.http_code,
                content_type='application/json',
                body=json.dumps(dict(faultstring=six.text_type(e))),
                charset='UTF-8'
            )

    return wrapped
tap_rest.py 文件源码 项目:FirewallController 作者: iFedix 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def create_tap(self, req, **_kwargs):
        # Come delete_tap
        try:
            filter_data = eval(req.body)
            print filter_data
            if not self.is_filter_data_valid(filter_data):
                return Response(status=400)
        except SyntaxError:
            LOG.error('Invalid syntax %s', req.body)
            return Response(status=400)

        if self.tap.create_tap(filter_data):
            return Response(status=200, content_type='application/json', body=json.dumps({'status': 'success'}))
        else:
            LOG.error('Create tap failed')
            return Response(status=501)
framework.py 文件源码 项目:Flask 作者: Humbertzhang 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def rest_controller(cls):
    def replacement(environ, start_response):
        req = Request(environ)
        try:
            instance = cls(req, **req.urlvars)
            action = req.urlvars.get('action')
            if action:
                action += '_' + req.method.lower()
            else:
                action = req.method.lower()

            try:
                method = getattr(instance, action)
            except AttributeError:
                raise exc.HTTPNotFound("No action %s" % action)

            resp = method()
            if isinstance(resp, type("")):
                resp = Response(body=resp)
        except exc.HTTPException as e:
            resp = e
        return resp(environ, start_response)
    return replacement
rest_router.py 文件源码 项目:ryu-lagopus-ext 作者: lagopus 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def rest_command(func):
    def _rest_command(*args, **kwargs):
        try:
            msg = func(*args, **kwargs)
            return Response(content_type='application/json',
                            body=json.dumps(msg))

        except SyntaxError as e:
            status = 400
            details = e.msg
        except (ValueError, NameError) as e:
            status = 400
            details = e.message

        except NotFoundError as msg:
            status = 404
            details = str(msg)

        msg = {REST_RESULT: REST_NG,
               REST_DETAILS: details}
        return Response(status=status, body=json.dumps(msg))

    return _rest_command
rest_firewall.py 文件源码 项目:ryu-lagopus-ext 作者: lagopus 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _access_module(self, switchid, func, waiters=None):
        try:
            dps = self._OFS_LIST.get_ofs(switchid)
        except ValueError as message:
            return Response(status=400, body=str(message))

        msgs = []
        for f_ofs in dps.values():
            function = getattr(f_ofs, func)
            msg = function() if waiters is None else function(waiters)
            msgs.append(msg)

        body = json.dumps(msgs)
        return Response(content_type='application/json', body=body)

    # GET /firewall/rules/{switchid}
rest_firewall.py 文件源码 项目:ryu-lagopus-ext 作者: lagopus 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _set_rule(self, req, switchid, vlan_id=VLANID_NONE):
        try:
            rule = req.json if req.body else {}
        except ValueError:
            FirewallController._LOGGER.debug('invalid syntax %s', req.body)
            return Response(status=400)

        try:
            dps = self._OFS_LIST.get_ofs(switchid)
            vid = FirewallController._conv_toint_vlanid(vlan_id)
        except ValueError as message:
            return Response(status=400, body=str(message))

        msgs = []
        for f_ofs in dps.values():
            try:
                msg = f_ofs.set_rule(rule, self.waiters, vid)
                msgs.append(msg)
            except ValueError as message:
                return Response(status=400, body=str(message))

        body = json.dumps(msgs)
        return Response(content_type='application/json', body=body)
rest_firewall.py 文件源码 项目:ryu-lagopus-ext 作者: lagopus 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _delete_rule(self, req, switchid, vlan_id=VLANID_NONE):
        try:
            ruleid = req.json if req.body else {}
        except ValueError:
            FirewallController._LOGGER.debug('invalid syntax %s', req.body)
            return Response(status=400)

        try:
            dps = self._OFS_LIST.get_ofs(switchid)
            vid = FirewallController._conv_toint_vlanid(vlan_id)
        except ValueError as message:
            return Response(status=400, body=str(message))

        msgs = []
        for f_ofs in dps.values():
            try:
                msg = f_ofs.delete_rule(ruleid, self.waiters, vid)
                msgs.append(msg)
            except ValueError as message:
                return Response(status=400, body=str(message))

        body = json.dumps(msgs)
        return Response(content_type='application/json', body=body)
test_resourcebasics.py 文件源码 项目:reahl 作者: reahl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_simple_sub_resources(web_fixture):
    """During their construction, Widgets can add SubResources to their View.  The SubResource
       will then be available via a special URL underneath the URL of the Widget's View."""

    fixture = web_fixture

    @stubclass(SubResource)
    class ASimpleSubResource(SubResource):
        sub_regex = 'simple_resource'
        sub_path_template = 'simple_resource'
        @exempt
        def handle_get(self, request):
            return Response()

    @stubclass(Widget)
    class WidgetWithSubResource(Widget):
        def __init__(self, view):
            super(WidgetWithSubResource, self).__init__(view)
            view.add_resource(ASimpleSubResource('uniquename'))

    wsgi_app = fixture.new_wsgi_app(child_factory=WidgetWithSubResource.factory())
    browser = Browser(wsgi_app)
    browser.open('/_uniquename_simple_resource')
mpsdn_controller.py 文件源码 项目:multipath-sdn-controller 作者: dariobanfi 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def change_bucket_weight(self, req, **kwargs):
        '''
        Changes bucket weight for a datapath's GROUP
        Rules are passed in this format port,weight; :
        Example : '1,1;2,1;3,2;4,2'
        '''
        multipath_controller = self.mp_instance

        try:
            dp_id = kwargs['dp_id']
            group_id = kwargs['group_id']
            arg_rules = kwargs['rules']
            rules = {}
            for i in arg_rules.split(';'):
                rules[int(i.split(',')[0])] = int(i.split(',')[1])

            multipath_controller.topo_shape.modify_group(
                multipath_controller.topo_shape.dpid_to_switch[
                    int(dp_id)].dp, int(group_id), rules)
        except:
            traceback.print_exc()

        return Response(content_type='text/html', body='Done!\n')
test_no_thread_locals.py 文件源码 项目:deb-python-pecan 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def root(self):
        class RootController(object):
            @expose()
            def index(self, req, resp):
                assert isinstance(req, webob.BaseRequest)
                assert isinstance(resp, webob.Response)
                return 'Hello, World!'

            @expose()
            def warning(self):
                return ("This should be unroutable because (req, resp) are not"
                        " arguments.  It should raise a TypeError.")

            @expose(generic=True)
            def generic(self):
                return ("This should be unroutable because (req, resp) are not"
                        " arguments.  It should raise a TypeError.")

            @generic.when(method='PUT')
            def generic_put(self, _id):
                return ("This should be unroutable because (req, resp) are not"
                        " arguments.  It should raise a TypeError.")

        return RootController
mixins.py 文件源码 项目:xblock-video 作者: appsembler 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def download_transcript(self, request, _suffix=''):
        """
        Download a transcript.

        Arguments:
            request (webob.Request): Request to handle.
            suffix (string): Slug used for routing.
        Returns:
            File with the correct name.
        """
        trans_path = self.get_path_for(request.query_string)
        filename = self.get_file_name_from_path(trans_path)
        transcript = requests.get(request.host_url + request.query_string).text
        response = Response(transcript)
        headerlist = [
            ('Content-Type', 'text/plain'),
            ('Content-Disposition', 'attachment; filename={}'.format(filename))
        ]
        response.headerlist = headerlist
        return response
test_mixins.py 文件源码 项目:xblock-video 作者: appsembler 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_download_transcript_handler_response_object(self, get_mock, get_filename_mock):
        """
        Test transcripts downloading works properly.
        """
        # Arrange
        get_filename_mock.return_value = 'transcript.vtt'
        get_mock.return_value.text = 'vtt transcripts'
        request_mock = MagicMock()
        request_mock.host_url = 'test.host'
        request_mock.query_string = '/test-query-string'

        # Act
        vtt_response = self.xblock.download_transcript(request_mock, 'unused suffix')

        # Assert
        self.assertIsInstance(vtt_response, Response)
        self.assertEqual(vtt_response.text, 'vtt transcripts')
        self.assertEqual(vtt_response.headerlist, [
            ('Content-Type', 'text/plain'),
            ('Content-Disposition', 'attachment; filename={}'.format('transcript.vtt'))
        ])
        get_mock.assert_called_once_with('test.host/test-query-string')
test_mixins.py 文件源码 项目:xblock-video 作者: appsembler 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_srt_to_vtt(self, convert_caps_to_vtt_mock, requests_mock):
        """
        Test xBlock's srt-to-vtt convertation works properly.
        """
        # Arrange
        request_mock = MagicMock()
        convert_caps_to_vtt_mock.return_value = 'vtt transcripts'
        requests_mock.get.return_value.text = text_mock = PropertyMock()
        text_mock.return_value = 'vtt transcripts'

        # Act
        vtt_response = self.xblock.srt_to_vtt(request_mock, 'unused suffix')

        # Assert
        self.assertIsInstance(vtt_response, Response)
        self.assertEqual(vtt_response.text, 'vtt transcripts')
        convert_caps_to_vtt_mock.assert_called_once_with(text_mock)
webapp2.py 文件源码 项目:blockhooks 作者: EthereumWebhooks 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _set_status(self, value):
        """The status string, including code and message."""
        message = None
        # Accept long because urlfetch in App Engine returns codes as longs.
        if isinstance(value, (int, long)):
            code = int(value)
        else:
            if isinstance(value, unicode):
                # Status messages have to be ASCII safe, so this is OK.
                value = str(value)

            if not isinstance(value, str):
                raise TypeError(
                    'You must set status to a string or integer (not %s)' %
                    type(value))

            parts = value.split(' ', 1)
            code = int(parts[0])
            if len(parts) == 2:
                message = parts[1]

        message = message or Response.http_status_message(code)
        self._status = '%d %s' % (code, message)
webapp2.py 文件源码 项目:blockhooks 作者: EthereumWebhooks 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def redirect_to(_name, _permanent=False, _abort=False, _code=None,
                _body=None, _request=None, _response=None, *args, **kwargs):
    """Convenience function mixing :func:`redirect` and :func:`uri_for`.

    Issues an HTTP redirect to a named URI built using :func:`uri_for`.

    :param _name:
        The route name to redirect to.
    :param args:
        Positional arguments to build the URI.
    :param kwargs:
        Keyword arguments to build the URI.
    :returns:
        A :class:`Response` instance.

    The other arguments are described in :func:`redirect`.
    """
    uri = uri_for(_name, _request=_request, *args, **kwargs)
    return redirect(uri, permanent=_permanent, abort=_abort, code=_code,
                    body=_body, request=_request, response=_response)
test_factory.py 文件源码 项目:pyramid_session_redis 作者: jvanasco 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_existing_session_session_after_invalidate_coe_True_no_exception(
        self
    ):
        # existing session -> invalidate() -> new session
        # cookie_on_exception is True by default, no exception raised
        import webob
        request = self._make_request()
        self._set_session_cookie(request=request,
                                 session_id=self._get_session_id(request))
        session = request.session = self._makeOne(request)
        session.invalidate()
        session['key'] = 'value'
        response = webob.Response()
        request.response_callbacks[0](request, response)
        set_cookie_headers = response.headers.getall('Set-Cookie')
        self.assertEqual(len(set_cookie_headers), 1)
        self._assert_is_a_header_to_set_cookie(set_cookie_headers[0])
test_factory.py 文件源码 项目:pyramid_session_redis 作者: jvanasco 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_existing_session_session_after_invalidate_coe_False_no_exception(
        self
    ):
        # existing session -> invalidate() -> new session
        # cookie_on_exception is False, no exception raised
        import webob
        request = self._make_request()
        self._set_session_cookie(request=request,
                                 session_id=self._get_session_id(request))
        session = request.session = self._makeOne(request,
                                                  cookie_on_exception=False)
        session.invalidate()
        session['key'] = 'value'
        response = webob.Response()
        request.response_callbacks[0](request, response)
        set_cookie_headers = response.headers.getall('Set-Cookie')
        self.assertEqual(len(set_cookie_headers), 1)
        self._assert_is_a_header_to_set_cookie(set_cookie_headers[0])
test_factory.py 文件源码 项目:pyramid_session_redis 作者: jvanasco 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_existing_session_session_after_invalidate_coe_False_exception(
        self
    ):
        # existing session -> invalidate() -> new session
        # cookie_on_exception is False, exception raised
        import webob
        request = self._make_request()
        self._set_session_cookie(request=request,
                                 session_id=self._get_session_id(request))
        session = request.session = self._makeOne(request,
                                                  cookie_on_exception=False)
        session.invalidate()
        session['key'] = 'value'
        request.exception = Exception()
        response = webob.Response()
        request.response_callbacks[0](request, response)
        set_cookie_headers = response.headers.getall('Set-Cookie')
        self.assertEqual(len(set_cookie_headers), 1)
        self.assertIn('Max-Age=0', set_cookie_headers[0])
        # Cancel setting of cookie for new session, but still delete cookie for
        # the earlier invalidate().
test_factory.py 文件源码 项目:pyramid_session_redis 作者: jvanasco 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_existing_session_multiple_invalidates(self):
        # existing session -> invalidate() -> new session -> invalidate()
        # Invalidate more than once, no new session after last invalidate()
        import webob
        request = self._make_request()
        self._set_session_cookie(request=request,
                                 session_id=self._get_session_id(request))
        session = request.session = self._makeOne(request)
        session.invalidate()
        session['key'] = 'value'
        session.invalidate()
        response = webob.Response()
        request.response_callbacks[0](request, response)
        set_cookie_headers = response.headers.getall('Set-Cookie')
        self.assertEqual(len(set_cookie_headers), 1)
        self.assertIn('Max-Age=0', set_cookie_headers[0])
test_factory.py 文件源码 项目:pyramid_session_redis 作者: jvanasco 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_existing_session_multiple_invalidates_no_new_session_in_between(
        self
    ):
        # existing session -> invalidate() -> invalidate()
        # Invalidate more than once, no new session in between invalidate()s,
        # no new session after last invalidate()
        import webob
        request = self._make_request()
        self._set_session_cookie(request=request,
                                 session_id=self._get_session_id(request))
        session = request.session = self._makeOne(request)
        session.invalidate()
        session.invalidate()
        response = webob.Response()
        request.response_callbacks[0](request, response)
        set_cookie_headers = response.headers.getall('Set-Cookie')
        self.assertEqual(len(set_cookie_headers), 1)
        self.assertIn('Max-Age=0', set_cookie_headers[0])
test_factory.py 文件源码 项目:pyramid_session_redis 作者: jvanasco 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _load_cookie_session_in_new_request(self, request_old, session_id='existing_session', **session_args):
        import webob
        # we need a request, but must persist the redis datastore
        request = self._make_request(request_old=request_old)

        self._set_session_cookie(request=request,
                                 session_id=session_id,
                                 )
        request.session = self._makeOne(request, **session_args)
        response = webob.Response()
        request.response_callbacks[0](request, response)
        request._process_finished_callbacks()  # runs any persist if needed

        self.assertNotIn('Set-Cookie', response.headers)
        # stored_session_data = self._deserialize_session_stored(request.session)
        return request
test_auth.py 文件源码 项目:masakari 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUp(self):
        super(TestKeystoneMiddlewareRoles, self).setUp()

        @webob.dec.wsgify()
        def role_check_app(req):
            context = req.environ['masakari.context']

            if "knight" in context.roles and "bad" not in context.roles:
                return webob.Response(status="200 Role Match")
            elif not context.roles:
                return webob.Response(status="200 No Roles")
            else:
                raise webob.exc.HTTPBadRequest("unexpected role header")

        self.middleware = (
            masakari.api.auth.MasakariKeystoneContext(role_check_app))
        self.request = webob.Request.blank('/')
        self.request.headers['X_USER'] = 'testuser'
        self.request.headers['X_TENANT_ID'] = 'testtenantid'
        self.request.headers['X_AUTH_TOKEN'] = 'testauthtoken'
        self.request.headers['X_SERVICE_CATALOG'] = jsonutils.dumps({})

        self.roles = "pawn, knight, rook"


问题


面经


文章

微信
公众号

扫码关注公众号