def test_resource_headers_are_utf8(self):
resp = webob.Response(status_int=http.ACCEPTED)
resp.headers['x-header1'] = 1
resp.headers['x-header2'] = u'header2'
resp.headers['x-header3'] = u'header3'
class Controller(object):
def index(self, req):
return resp
req = webob.Request.blank('/tests')
app = fakes.TestRouter(Controller())
response = req.get_response(app)
for val in response.headers.values():
# All headers must be utf8
self.assertThat(val, matchers.EncodedByUTF8())
self.assertEqual(b'1', response.headers['x-header1'])
self.assertEqual(b'header2', response.headers['x-header2'])
self.assertEqual(b'header3', response.headers['x-header3'])
python类Response()的实例源码
def serialize(self, request, content_type):
"""Serializes the wrapped object.
Utility method for serializing the wrapped object. Returns a
webob.Response object.
"""
serializer = self.serializer
body = None
if self.obj is not None:
body = serializer.serialize(self.obj)
response = webob.Response(body=body)
if response.headers.get('Content-Length'):
# NOTE: we need to encode 'Content-Length' header,
# since webob.Response auto sets it if "body" attr is presented.
# https://github.com/Pylons/webob/blob/1.5.0b0/webob/response.py#L147
response.headers['Content-Length'] = utils.utf8(
response.headers['Content-Length'])
response.status_int = self.code
for hdr, value in self._headers.items():
response.headers[hdr] = utils.utf8(value)
response.headers['Content-Type'] = utils.utf8(content_type)
return response
def put_vtable(self, req, **kwargs):
shortest_switch = self.shortest_switch_spp
#Check request isn't blank.
try:
request_body_size = int(req.environ.get('CONTENT_LENGTH', 0))
except(ValueError):
request_body_size = 0
d = parse_qs(req.body)
#Interpret vlan input
for key, value in d.items():
host = ('00:00:00:00:00:0'+key[4])
vlan = value[0][4]
shortest_switch.set_vtable(host, vlan)
body = self.html()
return Response(content_type='text/html', body=body)
def put_vtable(self, req, **kwargs):
simple_switch = self.simple_switch_spp
#Check request is not Blank.
try:
request_body_size = int(req.environ.get('CONTENT_LENGTH',0))
except(ValueError):
request_body_size = 0
d = parse_qs(req.body)
#Interpret the vlan list
for key, value in d.items():
host = ('00:00:00:00:00:0'+key[4])
vlan = value[0][4]
simple_switch.set_vtable(host, vlan)
body = self.html()
return Response(content_type='text/html', body=body)
def execute_get(self, path, parameters):
"""Execute GET request.
This method execute a POST request on the endpoint.
:param path: path of the request
:param parameters: parameters to include in the request
:return: json response
"""
try:
query_string = get_query_string(parameters)
req = self._get_req(path, query_string=query_string,
method="GET")
response = req.get_response()
except Exception as e:
response = webob.Response(status=500, body=str(e))
json_response = self._get_from_response(response)
return json_response
def execute_post(self, path, parameters):
"""Execute POST request.
This method execute a POST request on the endpoint.
:param path: path of the request
:param parameters: parameters to include in the request
:return: json response
"""
try:
body = make_body(parameters)
req = self._get_req(path, content_type="application/json",
body=body, method="POST")
response = req.get_response()
except Exception as e:
response = webob.Response(status=500, body=str(e))
json_response = self._get_from_response(response)
return json_response
def execute_delete(self, path, parameters):
"""Execute DELETE request.
This method execute a DELETE request on the endpoint.
:param path: path of the request
:param parameters: parameters to include in the request
:return: json response
"""
try:
query_string = get_query_string(parameters)
req = self._get_req(path, method="DELETE",
query_string=query_string)
response = req.get_response(None)
except Exception as e:
response = webob.Response(status=500, body=str(e))
json_response = self._get_from_response(response)
return json_response
def execute_put(self, path, parameters):
"""Execute PUT request.
This method execute a PUT request on the endpoint.
:param path: path of the request
:param parameters: parameters to include in the request
:return: json response
"""
try:
body = make_body(parameters)
req = self._get_req(path, content_type="application/json",
body=body, method="PUT")
response = req.get_response()
except Exception as e:
response = webob.Response(status=500, body=str(e))
json_response = self._get_from_response(response)
return json_response
test_auth.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def setUp(self):
super(TestKeystoneMiddlewareRoles, self).setUp()
@webob.dec.wsgify()
def role_check_app(req):
context = req.environ['nova.context']
if "knight" in context.roles and "bad" not in context.roles:
return webob.Response(status="200 Role Match")
elif context.roles == ['']:
return webob.Response(status="200 No Roles")
else:
raise webob.exc.HTTPBadRequest("unexpected role header")
self.middleware = nova.api.auth.NovaKeystoneContext(role_check_app)
self.request = webob.Request.blank('/')
self.request.headers['X_USER'] = 'testuser'
self.request.headers['X_TENANT_ID'] = 'testtenantid'
self.request.headers['X_AUTH_TOKEN'] = 'testauthtoken'
self.request.headers['X_SERVICE_CATALOG'] = jsonutils.dumps({})
self.roles = "pawn, knight, rook"
test_wsgi.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_resource_headers_are_utf8(self):
resp = webob.Response(status_int=202)
resp.headers['x-header1'] = 1
resp.headers['x-header2'] = u'header2'
resp.headers['x-header3'] = u'header3'
class Controller(object):
def index(self, req):
return resp
req = webob.Request.blank('/tests')
app = fakes.TestRouter(Controller())
response = req.get_response(app)
for val in six.itervalues(response.headers):
# All headers must be utf8
self.assertThat(val, matchers.EncodedByUTF8())
self.assertEqual(b'1', response.headers['x-header1'])
self.assertEqual(b'header2', response.headers['x-header2'])
self.assertEqual(b'header3', response.headers['x-header3'])
def serialize(self, request, content_type):
"""Serializes the wrapped object.
Utility method for serializing the wrapped object. Returns a
webob.Response object.
"""
serializer = self.serializer
body = None
if self.obj is not None:
body = serializer.serialize(self.obj)
response = webob.Response(body=body)
if response.headers.get('Content-Length'):
# NOTE(andreykurilin): we need to encode 'Content-Length' header,
# since webob.Response auto sets it if "body" attr is presented.
# https://github.com/Pylons/webob/blob/1.5.0b0/webob/response.py#L147
response.headers['Content-Length'] = utils.utf8(
response.headers['Content-Length'])
response.status_int = self.code
for hdr, value in self._headers.items():
response.headers[hdr] = utils.utf8(value)
response.headers['Content-Type'] = utils.utf8(content_type)
return response
floating_ip_dns.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def delete(self, req, domain_id, id):
"""Delete the entry identified by req and id."""
context = req.environ['nova.context']
authorize(context)
domain = _unquote_domain(domain_id)
name = id
try:
self.network_api.delete_dns_entry(context, name, domain)
except exception.NotFound as e:
raise webob.exc.HTTPNotFound(explanation=e.format_message())
except NotImplementedError:
msg = _("Unable to delete dns entry")
raise webob.exc.HTTPNotImplemented(explanation=msg)
return webob.Response(status_int=202)
networks_associate.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def _disassociate_host_only(self, req, id, body):
context = req.environ['nova.context']
authorize(context)
# NOTE(shaohe-feng): back-compatible with db layer hard-code
# admin permission checks. call db API objects.Network.associate
nova_context.require_admin_context(context)
try:
self.network_api.associate(context, id, host=None)
except exception.NetworkNotFound:
msg = _("Network not found")
raise exc.HTTPNotFound(explanation=msg)
except NotImplementedError:
msg = _('Disassociate host is not implemented by the configured '
'Network API')
raise exc.HTTPNotImplemented(explanation=msg)
return webob.Response(status_int=202)
networks_associate.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def _disassociate_project_only(self, req, id, body):
context = req.environ['nova.context']
authorize(context)
# NOTE(shaohe-feng): back-compatible with db layer hard-code
# admin permission checks. call db API objects.Network.associate
nova_context.require_admin_context(context)
try:
self.network_api.associate(context, id, project=None)
except exception.NetworkNotFound:
msg = _("Network not found")
raise exc.HTTPNotFound(explanation=msg)
except NotImplementedError:
msg = _('Disassociate project is not implemented by the '
'configured Network API')
raise exc.HTTPNotImplemented(explanation=msg)
return webob.Response(status_int=202)
floating_ips.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def delete(self, req, id):
context = req.environ['nova.context']
authorize(context)
# get the floating ip object
try:
floating_ip = self.network_api.get_floating_ip(context, id)
except (exception.NotFound, exception.InvalidID):
msg = _("Floating IP not found for ID %s") % id
raise webob.exc.HTTPNotFound(explanation=msg)
address = floating_ip['address']
# get the associated instance object (if any)
instance = get_instance_by_floating_ip_addr(self, context, address)
try:
self.network_api.disassociate_and_release_floating_ip(
context, instance, floating_ip)
except exception.Forbidden:
raise webob.exc.HTTPForbidden()
except exception.CannotDisassociateAutoAssignedFloatingIP:
msg = _('Cannot disassociate auto assigned floating IP')
raise webob.exc.HTTPForbidden(explanation=msg)
return webob.Response(status_int=202)
def get_client_users_out_stack(request):
logger.debug('get_client_users_out_stack is running')
client_id = request.matchdict.get('id', -1)
client = Client.query.filter_by(id=client_id).first()
if not client:
transaction.abort()
return Response('Can not find a client with id: %s' % client_id, 500)
sql_query = """
select
"User_SimpleEntities".name,
"User_SimpleEntities".id
from "Users"
left outer join "Client_Users" on "Client_Users".uid = "Users".id
join "SimpleEntities" as "User_SimpleEntities" on "User_SimpleEntities".id = "Users".id
where "Client_Users".cid != %(client_id)s or "Client_Users".cid is Null
"""
sql_query = sql_query % {'client_id': client_id}
result = DBSession.connection().execute(sql_query)
users = []
for r in result.fetchall():
user = {
'name': r[0],
'id': r[1]
}
users.append(user)
resp = Response(
json_body=users
)
return resp
def update_daily(request):
"""runs when updating a daily
"""
logged_in_user = get_logged_in_user(request)
utc_now = local_to_utc(datetime.datetime.now())
daily_id = request.matchdict.get('id', -1)
daily = Daily.query.filter(Daily.id == daily_id).first()
if not daily:
transaction.abort()
return Response('No daily with id : %s' % daily_id, 500)
name = request.params.get('name')
description = request.params.get('description')
status_id = request.params.get('status_id')
status = Status.query.filter(Status.id == status_id).first()
if not name:
return Response('Please supply a name', 500)
if not description:
return Response('Please supply a description', 500)
if not status:
return Response('There is no status with code: %s' % status.code, 500)
daily.name = name
daily.description = description
daily.status = status
daily.date_updated = utc_now
daily.updated_by = logged_in_user
request.session.flash('success: Successfully updated daily')
return Response('Successfully updated daily')
def get_assets_types(request):
"""returns the Asset Types
"""
sql_query = """select
"Assets_Types_SimpleEntities".id,
"Assets_Types_SimpleEntities".name
from "Assets"
join "SimpleEntities" as "Assets_SimpleEntities" on "Assets_SimpleEntities".id = "Assets".id
join "SimpleEntities" as "Assets_Types_SimpleEntities" on "Assets_Types_SimpleEntities".id = "Assets_SimpleEntities".type_id
group by
"Assets_Types_SimpleEntities".name,
"Assets_Types_SimpleEntities".id
order by "Assets_Types_SimpleEntities".name
"""
result = DBSession.connection().execute(sql_query)
return_data = [
{
'asset_type_id': r[0],
'asset_type_name': r[1]
}
for r in result.fetchall()
]
content_range = '%s-%s/%s'
type_count = len(return_data)
content_range = content_range % (0, type_count - 1, type_count)
logger.debug('content_range : %s' % content_range)
resp = Response(
json_body=return_data
)
resp.content_range = content_range
return resp
def update_budget(request):
"""runs when updating a budget
"""
logged_in_user = get_logged_in_user(request)
utc_now = local_to_utc(datetime.datetime.now())
budget_id = request.matchdict.get('id', -1)
budget = Budget.query.filter(Budget.id == budget_id).first()
if not budget:
transaction.abort()
return Response('No budget with id : %s' % budget_id, 500)
name = request.params.get('name')
description = request.params.get('description')
status_id = request.params.get('status_id')
status = Status.query.filter(Status.id == status_id).first()
if not name:
return Response('Please supply a name', 500)
if not description:
return Response('Please supply a description', 500)
if not status:
return Response('There is no status with code: %s' % status.code, 500)
budget.name = name
budget.description = description
budget.status = status
budget.date_updated = utc_now
budget.updated_by = logged_in_user
request.session.flash('success: Successfully updated budget')
return Response('Successfully updated budget')
def delete_budgetentry_action(budgetentry):
logger.debug('delete_budgetentry_action %s' % budgetentry.name)
budgetentry_name = budgetentry.name
try:
DBSession.delete(budgetentry)
transaction.commit()
except Exception as e:
transaction.abort()
c = StdErrToHTMLConverter(e)
transaction.abort()
# return Response(c.html(), 500)
# return Response('Successfully deleted good with name %s' % budgetentry_name)