def append_link_to_daily(request):
link_id = request.matchdict.get('id')
link = Link.query.filter_by(id=link_id).first()
daily_id = request.matchdict.get('did')
daily = Daily.query.filter_by(id=daily_id).first()
if not link:
transaction.abort()
return Response('There is no link with id: %s' % link_id, 500)
if not daily:
transaction.abort()
return Response('There is no daily with id: %s' % daily_id, 500)
if link not in daily.links:
daily.links.append(link)
request.session.flash(
'success: Output is added to daily: %s '% daily.name
)
return Response('Output is added to daily: %s '% daily.name)
python类Response()的实例源码
def remove_link_to_daily(request):
link_id = request.matchdict.get('id')
link = Link.query.filter_by(id=link_id).first()
daily_id = request.matchdict.get('did')
daily = Daily.query.filter_by(id=daily_id).first()
if not link:
transaction.abort()
return Response('There is no link with id: %s' % link_id, 500)
if not daily:
transaction.abort()
return Response('There is no daily with id: %s' % daily_id, 500)
if link in daily.links:
daily.links.remove(link)
request.session.flash(
'success: Output is removed from daily: %s '% daily.name
)
return Response('Output is removed to daily: %s '% daily.name)
def delete_budgetentry(request):
"""deletes the budgetentry
"""
budgetentry_id = request.params.get('id')
budgetentry = BudgetEntry.query.filter_by(id=budgetentry_id).first()
if not budgetentry:
transaction.abort()
return Response('There is no budgetentry with id: %s' % budgetentry_id, 500)
if budgetentry.type.name == 'Calendar':
transaction.abort()
return Response('You can not delete CalenderBasedEntry', 500)
delete_budgetentry_action(budgetentry)
def serialize(self, request, content_type, default_serializers=None):
"""Serializes the wrapped object.
Utility method for serializing the wrapped object. Returns a
webob.Response object.
"""
if self.serializer:
serializer = self.serializer
else:
_mtype, _serializer = self.get_serializer(content_type,
default_serializers)
serializer = _serializer()
response = webob.Response()
response.status_int = self.code
for hdr, value in self._headers.items():
response.headers[hdr] = six.text_type(value)
response.headers['Content-Type'] = six.text_type(content_type)
if self.obj is not None:
response.body = serializer.serialize(self.obj)
return response
def _call_apex(request):
node = _lookup_table[request.matched_route.name]
function_name = node.handler
try:
response = run_function(request, function_name)
except Exception as e:
partial = functools.partial(filter_on_response_pattern, e)
response = filter(partial, node.responses)
if response:
response = response[0]
body = build_error_response(response.code, e)
return Response(body=body, status_code=response.code, content_type=response.body[0].mime_type)
return e
else:
return Response(body=json.dumps(response), content_type='application/json')
def http_error(context, request):
with JSONAPIResponse(request.response) as resp:
_in = u'Failed'
code, status = JSONAPIResponse.BAD_REQUEST
if isinstance(context, webob.Response) \
and context.content_type == 'application/json':
return context
request.response.status = context.status
status = context.status
for (header, value) in context.headers.items():
if header in {'Content-Type', 'Content-Length'}:
continue
request.response.headers[header] = value
if context.message:
message = {'message': context.message}
else:
message = {'message': context.status}
return resp.to_json(
_in, code=code,
status=status, message=message)
def _set_status(self, value):
"""The status string, including code and message."""
message = None
# Accept long because urlfetch in App Engine returns codes as longs.
if isinstance(value, six.integer_types):
code = int(value)
else:
if isinstance(value, six.text_type):
# Status messages have to be ASCII safe, so this is OK.
value = str(value)
if not isinstance(value, str):
raise TypeError(
'You must set status to a string or integer (not %s)'
% type(value))
parts = value.split(' ', 1)
code = int(parts[0])
if len(parts) == 2:
message = parts[1]
message = message or Response.http_status_message(code)
self._status = '%d %s' % (code, message)
def redirect_to(_name, _permanent=False, _abort=False, _code=None,
_body=None, _request=None, _response=None, *args, **kwargs):
"""Convenience function mixing :func:`redirect` and :func:`uri_for`.
Issues an HTTP redirect to a named URI built using :func:`uri_for`.
:param _name:
The route name to redirect to.
:param args:
Positional arguments to build the URI.
:param kwargs:
Keyword arguments to build the URI.
:returns:
A :class:`Response` instance.
The other arguments are described in :func:`redirect`.
"""
uri = uri_for(_name, _request=_request, *args, **kwargs)
return redirect(uri, permanent=_permanent, abort=_abort, code=_code,
body=_body, request=_request, response=_response)
def wrap_pecan_controller_exception(func):
"""Decorator for controllers method.
This decorator wraps controllers method to manage pecan exceptions:
In case of expected error it aborts the request with specific status code.
"""
@functools.wraps(func)
def wrapped(*args, **kwargs):
try:
return func(*args, **kwargs)
except exc.QinlingException as e:
LOG.error('Error during API call: %s', six.text_type(e))
return webob.Response(
status=e.http_code,
content_type='application/json',
body=json.dumps(dict(faultstring=six.text_type(e))),
charset='UTF-8'
)
return wrapped
def create_tap(self, req, **_kwargs):
# Come delete_tap
try:
filter_data = eval(req.body)
print filter_data
if not self.is_filter_data_valid(filter_data):
return Response(status=400)
except SyntaxError:
LOG.error('Invalid syntax %s', req.body)
return Response(status=400)
if self.tap.create_tap(filter_data):
return Response(status=200, content_type='application/json', body=json.dumps({'status': 'success'}))
else:
LOG.error('Create tap failed')
return Response(status=501)
def rest_controller(cls):
def replacement(environ, start_response):
req = Request(environ)
try:
instance = cls(req, **req.urlvars)
action = req.urlvars.get('action')
if action:
action += '_' + req.method.lower()
else:
action = req.method.lower()
try:
method = getattr(instance, action)
except AttributeError:
raise exc.HTTPNotFound("No action %s" % action)
resp = method()
if isinstance(resp, type("")):
resp = Response(body=resp)
except exc.HTTPException as e:
resp = e
return resp(environ, start_response)
return replacement
def rest_command(func):
def _rest_command(*args, **kwargs):
try:
msg = func(*args, **kwargs)
return Response(content_type='application/json',
body=json.dumps(msg))
except SyntaxError as e:
status = 400
details = e.msg
except (ValueError, NameError) as e:
status = 400
details = e.message
except NotFoundError as msg:
status = 404
details = str(msg)
msg = {REST_RESULT: REST_NG,
REST_DETAILS: details}
return Response(status=status, body=json.dumps(msg))
return _rest_command
def _access_module(self, switchid, func, waiters=None):
try:
dps = self._OFS_LIST.get_ofs(switchid)
except ValueError as message:
return Response(status=400, body=str(message))
msgs = []
for f_ofs in dps.values():
function = getattr(f_ofs, func)
msg = function() if waiters is None else function(waiters)
msgs.append(msg)
body = json.dumps(msgs)
return Response(content_type='application/json', body=body)
# GET /firewall/rules/{switchid}
def _set_rule(self, req, switchid, vlan_id=VLANID_NONE):
try:
rule = req.json if req.body else {}
except ValueError:
FirewallController._LOGGER.debug('invalid syntax %s', req.body)
return Response(status=400)
try:
dps = self._OFS_LIST.get_ofs(switchid)
vid = FirewallController._conv_toint_vlanid(vlan_id)
except ValueError as message:
return Response(status=400, body=str(message))
msgs = []
for f_ofs in dps.values():
try:
msg = f_ofs.set_rule(rule, self.waiters, vid)
msgs.append(msg)
except ValueError as message:
return Response(status=400, body=str(message))
body = json.dumps(msgs)
return Response(content_type='application/json', body=body)
def _delete_rule(self, req, switchid, vlan_id=VLANID_NONE):
try:
ruleid = req.json if req.body else {}
except ValueError:
FirewallController._LOGGER.debug('invalid syntax %s', req.body)
return Response(status=400)
try:
dps = self._OFS_LIST.get_ofs(switchid)
vid = FirewallController._conv_toint_vlanid(vlan_id)
except ValueError as message:
return Response(status=400, body=str(message))
msgs = []
for f_ofs in dps.values():
try:
msg = f_ofs.delete_rule(ruleid, self.waiters, vid)
msgs.append(msg)
except ValueError as message:
return Response(status=400, body=str(message))
body = json.dumps(msgs)
return Response(content_type='application/json', body=body)
def test_simple_sub_resources(web_fixture):
"""During their construction, Widgets can add SubResources to their View. The SubResource
will then be available via a special URL underneath the URL of the Widget's View."""
fixture = web_fixture
@stubclass(SubResource)
class ASimpleSubResource(SubResource):
sub_regex = 'simple_resource'
sub_path_template = 'simple_resource'
@exempt
def handle_get(self, request):
return Response()
@stubclass(Widget)
class WidgetWithSubResource(Widget):
def __init__(self, view):
super(WidgetWithSubResource, self).__init__(view)
view.add_resource(ASimpleSubResource('uniquename'))
wsgi_app = fixture.new_wsgi_app(child_factory=WidgetWithSubResource.factory())
browser = Browser(wsgi_app)
browser.open('/_uniquename_simple_resource')
def change_bucket_weight(self, req, **kwargs):
'''
Changes bucket weight for a datapath's GROUP
Rules are passed in this format port,weight; :
Example : '1,1;2,1;3,2;4,2'
'''
multipath_controller = self.mp_instance
try:
dp_id = kwargs['dp_id']
group_id = kwargs['group_id']
arg_rules = kwargs['rules']
rules = {}
for i in arg_rules.split(';'):
rules[int(i.split(',')[0])] = int(i.split(',')[1])
multipath_controller.topo_shape.modify_group(
multipath_controller.topo_shape.dpid_to_switch[
int(dp_id)].dp, int(group_id), rules)
except:
traceback.print_exc()
return Response(content_type='text/html', body='Done!\n')
def root(self):
class RootController(object):
@expose()
def index(self, req, resp):
assert isinstance(req, webob.BaseRequest)
assert isinstance(resp, webob.Response)
return 'Hello, World!'
@expose()
def warning(self):
return ("This should be unroutable because (req, resp) are not"
" arguments. It should raise a TypeError.")
@expose(generic=True)
def generic(self):
return ("This should be unroutable because (req, resp) are not"
" arguments. It should raise a TypeError.")
@generic.when(method='PUT')
def generic_put(self, _id):
return ("This should be unroutable because (req, resp) are not"
" arguments. It should raise a TypeError.")
return RootController
def download_transcript(self, request, _suffix=''):
"""
Download a transcript.
Arguments:
request (webob.Request): Request to handle.
suffix (string): Slug used for routing.
Returns:
File with the correct name.
"""
trans_path = self.get_path_for(request.query_string)
filename = self.get_file_name_from_path(trans_path)
transcript = requests.get(request.host_url + request.query_string).text
response = Response(transcript)
headerlist = [
('Content-Type', 'text/plain'),
('Content-Disposition', 'attachment; filename={}'.format(filename))
]
response.headerlist = headerlist
return response
def test_download_transcript_handler_response_object(self, get_mock, get_filename_mock):
"""
Test transcripts downloading works properly.
"""
# Arrange
get_filename_mock.return_value = 'transcript.vtt'
get_mock.return_value.text = 'vtt transcripts'
request_mock = MagicMock()
request_mock.host_url = 'test.host'
request_mock.query_string = '/test-query-string'
# Act
vtt_response = self.xblock.download_transcript(request_mock, 'unused suffix')
# Assert
self.assertIsInstance(vtt_response, Response)
self.assertEqual(vtt_response.text, 'vtt transcripts')
self.assertEqual(vtt_response.headerlist, [
('Content-Type', 'text/plain'),
('Content-Disposition', 'attachment; filename={}'.format('transcript.vtt'))
])
get_mock.assert_called_once_with('test.host/test-query-string')
def test_srt_to_vtt(self, convert_caps_to_vtt_mock, requests_mock):
"""
Test xBlock's srt-to-vtt convertation works properly.
"""
# Arrange
request_mock = MagicMock()
convert_caps_to_vtt_mock.return_value = 'vtt transcripts'
requests_mock.get.return_value.text = text_mock = PropertyMock()
text_mock.return_value = 'vtt transcripts'
# Act
vtt_response = self.xblock.srt_to_vtt(request_mock, 'unused suffix')
# Assert
self.assertIsInstance(vtt_response, Response)
self.assertEqual(vtt_response.text, 'vtt transcripts')
convert_caps_to_vtt_mock.assert_called_once_with(text_mock)
def _set_status(self, value):
"""The status string, including code and message."""
message = None
# Accept long because urlfetch in App Engine returns codes as longs.
if isinstance(value, (int, long)):
code = int(value)
else:
if isinstance(value, unicode):
# Status messages have to be ASCII safe, so this is OK.
value = str(value)
if not isinstance(value, str):
raise TypeError(
'You must set status to a string or integer (not %s)' %
type(value))
parts = value.split(' ', 1)
code = int(parts[0])
if len(parts) == 2:
message = parts[1]
message = message or Response.http_status_message(code)
self._status = '%d %s' % (code, message)
def redirect_to(_name, _permanent=False, _abort=False, _code=None,
_body=None, _request=None, _response=None, *args, **kwargs):
"""Convenience function mixing :func:`redirect` and :func:`uri_for`.
Issues an HTTP redirect to a named URI built using :func:`uri_for`.
:param _name:
The route name to redirect to.
:param args:
Positional arguments to build the URI.
:param kwargs:
Keyword arguments to build the URI.
:returns:
A :class:`Response` instance.
The other arguments are described in :func:`redirect`.
"""
uri = uri_for(_name, _request=_request, *args, **kwargs)
return redirect(uri, permanent=_permanent, abort=_abort, code=_code,
body=_body, request=_request, response=_response)
def test_existing_session_session_after_invalidate_coe_True_no_exception(
self
):
# existing session -> invalidate() -> new session
# cookie_on_exception is True by default, no exception raised
import webob
request = self._make_request()
self._set_session_cookie(request=request,
session_id=self._get_session_id(request))
session = request.session = self._makeOne(request)
session.invalidate()
session['key'] = 'value'
response = webob.Response()
request.response_callbacks[0](request, response)
set_cookie_headers = response.headers.getall('Set-Cookie')
self.assertEqual(len(set_cookie_headers), 1)
self._assert_is_a_header_to_set_cookie(set_cookie_headers[0])
def test_existing_session_session_after_invalidate_coe_False_no_exception(
self
):
# existing session -> invalidate() -> new session
# cookie_on_exception is False, no exception raised
import webob
request = self._make_request()
self._set_session_cookie(request=request,
session_id=self._get_session_id(request))
session = request.session = self._makeOne(request,
cookie_on_exception=False)
session.invalidate()
session['key'] = 'value'
response = webob.Response()
request.response_callbacks[0](request, response)
set_cookie_headers = response.headers.getall('Set-Cookie')
self.assertEqual(len(set_cookie_headers), 1)
self._assert_is_a_header_to_set_cookie(set_cookie_headers[0])
def test_existing_session_session_after_invalidate_coe_False_exception(
self
):
# existing session -> invalidate() -> new session
# cookie_on_exception is False, exception raised
import webob
request = self._make_request()
self._set_session_cookie(request=request,
session_id=self._get_session_id(request))
session = request.session = self._makeOne(request,
cookie_on_exception=False)
session.invalidate()
session['key'] = 'value'
request.exception = Exception()
response = webob.Response()
request.response_callbacks[0](request, response)
set_cookie_headers = response.headers.getall('Set-Cookie')
self.assertEqual(len(set_cookie_headers), 1)
self.assertIn('Max-Age=0', set_cookie_headers[0])
# Cancel setting of cookie for new session, but still delete cookie for
# the earlier invalidate().
def test_existing_session_multiple_invalidates(self):
# existing session -> invalidate() -> new session -> invalidate()
# Invalidate more than once, no new session after last invalidate()
import webob
request = self._make_request()
self._set_session_cookie(request=request,
session_id=self._get_session_id(request))
session = request.session = self._makeOne(request)
session.invalidate()
session['key'] = 'value'
session.invalidate()
response = webob.Response()
request.response_callbacks[0](request, response)
set_cookie_headers = response.headers.getall('Set-Cookie')
self.assertEqual(len(set_cookie_headers), 1)
self.assertIn('Max-Age=0', set_cookie_headers[0])
def test_existing_session_multiple_invalidates_no_new_session_in_between(
self
):
# existing session -> invalidate() -> invalidate()
# Invalidate more than once, no new session in between invalidate()s,
# no new session after last invalidate()
import webob
request = self._make_request()
self._set_session_cookie(request=request,
session_id=self._get_session_id(request))
session = request.session = self._makeOne(request)
session.invalidate()
session.invalidate()
response = webob.Response()
request.response_callbacks[0](request, response)
set_cookie_headers = response.headers.getall('Set-Cookie')
self.assertEqual(len(set_cookie_headers), 1)
self.assertIn('Max-Age=0', set_cookie_headers[0])
def _load_cookie_session_in_new_request(self, request_old, session_id='existing_session', **session_args):
import webob
# we need a request, but must persist the redis datastore
request = self._make_request(request_old=request_old)
self._set_session_cookie(request=request,
session_id=session_id,
)
request.session = self._makeOne(request, **session_args)
response = webob.Response()
request.response_callbacks[0](request, response)
request._process_finished_callbacks() # runs any persist if needed
self.assertNotIn('Set-Cookie', response.headers)
# stored_session_data = self._deserialize_session_stored(request.session)
return request
def setUp(self):
super(TestKeystoneMiddlewareRoles, self).setUp()
@webob.dec.wsgify()
def role_check_app(req):
context = req.environ['masakari.context']
if "knight" in context.roles and "bad" not in context.roles:
return webob.Response(status="200 Role Match")
elif not context.roles:
return webob.Response(status="200 No Roles")
else:
raise webob.exc.HTTPBadRequest("unexpected role header")
self.middleware = (
masakari.api.auth.MasakariKeystoneContext(role_check_app))
self.request = webob.Request.blank('/')
self.request.headers['X_USER'] = 'testuser'
self.request.headers['X_TENANT_ID'] = 'testtenantid'
self.request.headers['X_AUTH_TOKEN'] = 'testauthtoken'
self.request.headers['X_SERVICE_CATALOG'] = jsonutils.dumps({})
self.roles = "pawn, knight, rook"