def java_to_python_keys(self):
if SETTINGS.CONVERT_KEYS and self.need_convert('request', self.method):
for param_data in (self.uri_params, self.params, self.data):
if isinstance(param_data, dict):
list_param_data = [param_data]
elif isinstance(param_data, (tuple, list)):
list_param_data = param_data
else:
raise Exception('data should be dict, list or tuple')
for pd in list_param_data:
keys = list(pd.keys())
for key in keys:
if key in SETTINGS.KEYWORDS_WITH_VALUE_NEED_CONVERT:
value = pd[key]
if isinstance(value, list):
for i, v in enumerate(value):
value[i] = self.java_to_python(v)
else:
pd[key] = self.java_to_python(value)
else:
pd[self.java_to_python(key)] = pd.pop(key)
if self.method == 'get' and SETTINGS.ORDER_BY in self.params:
self.params[SETTINGS.ORDER_BY] = self.java_to_python(self.params[SETTINGS.ORDER_BY])
# rename input keys in http request
python类http()的实例源码
benchmark_api_view.py 文件源码
项目:benchmark-django-rest-framework
作者: hqsh
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def get_next_url(request, redirect_field_name):
"""Retrieves next url from request
Note: This verifies that the url is safe before returning it. If the url
is not safe, this returns None.
:arg HttpRequest request: the http request
:arg str redirect_field_name: the name of the field holding the next url
:returns: safe url or None
"""
next_url = request.GET.get(redirect_field_name)
if next_url:
kwargs = {
'url': next_url,
'host': request.get_host()
}
# NOTE(willkg): Django 1.11+ allows us to require https, too.
if django.VERSION >= (1, 11):
kwargs['require_https'] = request.is_secure()
is_safe = is_safe_url(**kwargs)
if is_safe:
return next_url
return None
def test_no_oidc_token_expiration_forces_renewal(self, mock_random_string):
mock_random_string.return_value = 'examplestring'
request = self.factory.get('/foo')
request.user = self.user
request.session = {}
response = self.middleware.process_request(request)
self.assertEquals(response.status_code, 302)
url, qs = response.url.split('?')
self.assertEquals(url, 'http://example.com/authorize')
expected_query = {
'response_type': ['code'],
'redirect_uri': ['http://testserver/callback/'],
'client_id': ['foo'],
'nonce': ['examplestring'],
'prompt': ['none'],
'scope': ['openid email'],
'state': ['examplestring'],
}
self.assertEquals(expected_query, parse_qs(qs))
def get_times(request):
"""Gets start and endtime from request
As we use no timezone in NAV, remove it from parsed timestamps
:param request: django.http.HttpRequest
"""
starttime = request.GET.get('starttime')
endtime = request.GET.get('endtime')
try:
if starttime:
starttime = iso8601.parse_date(starttime).replace(tzinfo=None)
if endtime:
endtime = iso8601.parse_date(endtime).replace(tzinfo=None)
except iso8601.ParseError:
raise Iso8601ParseError
return starttime, endtime
def is_authenticated(self, request):
"""
Handles checking if the user is authenticated and dealing with
unauthenticated users.
Mostly a hook, this uses class assigned to ``authentication`` from
``Resource._meta``.
"""
# Authenticate the request as needed.
auth_result = self._meta.authentication.is_authenticated(request)
if isinstance(auth_result, HttpResponse):
raise ImmediateHttpResponse(response=auth_result)
if not auth_result is True:
raise ImmediateHttpResponse(response=http.HttpUnauthorized())
def get_detail(self, request, **kwargs):
"""
Returns a single serialized resource.
Calls ``cached_obj_get/obj_get`` to provide the data, then handles that result
set and serializes it.
Should return a HttpResponse (200 OK).
"""
basic_bundle = self.build_bundle(request=request)
try:
obj = self.cached_obj_get(bundle=basic_bundle, **self.remove_api_resource_names(kwargs))
except ObjectDoesNotExist:
return http.HttpNotFound()
except MultipleObjectsReturned:
return http.HttpMultipleChoices("More than one resource is found at this URI.")
bundle = self.build_bundle(obj=obj, request=request)
bundle = self.full_dehydrate(bundle)
bundle = self.alter_detail_data_to_serialize(request, bundle)
return self.create_response(request, bundle)
def delete_detail(self, request, **kwargs):
"""
Destroys a single resource/object.
Calls ``obj_delete``.
If the resource is deleted, return ``HttpNoContent`` (204 No Content).
If the resource did not exist, return ``Http404`` (404 Not Found).
"""
# Manually construct the bundle here, since we don't want to try to
# delete an empty instance.
bundle = Bundle(request=request)
try:
self.obj_delete(bundle=bundle, **self.remove_api_resource_names(kwargs))
return http.HttpNoContent()
except NotFound:
return http.HttpNotFound()
def delete(self, request, id):
"""Delete a single user by id.
This method returns HTTP 204 (no content) on success.
"""
if id == 'current':
raise django.http.HttpResponseNotFound('current')
api.keystone.user_delete(request, id)
def delete(self, request, id):
"""Delete a single role by id.
This method returns HTTP 204 (no content) on success.
"""
if id == 'default':
raise django.http.HttpResponseNotFound('default')
api.keystone.role_delete(request, id)
def delete(self, request, id):
"""Delete a single domain by id.
This method returns HTTP 204 (no content) on success.
"""
if id == 'default':
raise django.http.HttpResponseNotFound('default')
api.keystone.domain_delete(request, id)
def __init__(self, environ):
script_name = get_script_name(environ)
path_info = get_path_info(environ)
if not path_info:
# Sometimes PATH_INFO exists, but is empty (e.g. accessing
# the SCRIPT_NAME URL without a trailing slash). We really need to
# operate as if they'd requested '/'. Not amazingly nice to force
# the path like this, but should be harmless.
path_info = '/'
self.environ = environ
self.path_info = path_info
# be careful to only replace the first slash in the path because of
# http://test/something and http://test//something being different as
# stated in http://www.ietf.org/rfc/rfc2396.txt
self.path = '%s/%s' % (script_name.rstrip('/'),
path_info.replace('/', '', 1))
self.META = environ
self.META['PATH_INFO'] = path_info
self.META['SCRIPT_NAME'] = script_name
self.method = environ['REQUEST_METHOD'].upper()
_, content_params = cgi.parse_header(environ.get('CONTENT_TYPE', ''))
if 'charset' in content_params:
try:
codecs.lookup(content_params['charset'])
except LookupError:
pass
else:
self.encoding = content_params['charset']
self._post_parse_error = False
try:
content_length = int(environ.get('CONTENT_LENGTH'))
except (ValueError, TypeError):
content_length = 0
self._stream = LimitedStream(self.environ['wsgi.input'], content_length)
self._read_started = False
self.resolver_match = None
def GET(self):
# The WSGI spec says 'QUERY_STRING' may be absent.
raw_query_string = get_bytes_from_wsgi(self.environ, 'QUERY_STRING', '')
return http.QueryDict(raw_query_string, encoding=self._encoding)
def COOKIES(self):
raw_cookie = get_str_from_wsgi(self.environ, 'HTTP_COOKIE', '')
return http.parse_cookie(raw_cookie)
def COOKIES(self):
#raw_cookie = get_str_from_wsgi(self.environ, 'HTTP_COOKIE', '')
#return http.parse_cookie(raw_cookie)
return self.sanic_request.cookies
def JsonResponse(responseData):
import django
if float(django.get_version()) >= 1.7:
from django.http import JsonResponse
return JsonResponse(responseData)
else:
return HttpResponse(json.dumps(responseData), content_type="application/json")
def preview_page(self, request, object_id, language):
"""Redirecting preview function based on draft_id
"""
page = get_object_or_404(self.model, id=object_id)
attrs = "?%s" % get_cms_setting('CMS_TOOLBAR_URL__EDIT_ON')
attrs += "&language=" + language
with force_language(language):
url = page.get_absolute_url(language) + attrs
site = get_current_site(request)
if not site == page.site:
url = "http%s://%s%s" % ('s' if request.is_secure() else '',
page.site.domain, url)
return HttpResponseRedirect(url)
def test_expired_token_forces_renewal(self, mock_random_string):
mock_random_string.return_value = 'examplestring'
request = self.factory.get('/foo')
request.user = self.user
request.session = {
'oidc_id_token_expiration': time.time() - 10
}
response = self.middleware.process_request(request)
self.assertEquals(response.status_code, 302)
url, qs = response.url.split('?')
self.assertEquals(url, 'http://example.com/authorize')
expected_query = {
'response_type': ['code'],
'redirect_uri': ['http://testserver/callback/'],
'client_id': ['foo'],
'nonce': ['examplestring'],
'prompt': ['none'],
'scope': ['openid email'],
'state': ['examplestring'],
}
self.assertEquals(expected_query, parse_qs(qs))
# This adds a "home page" we can test against.
def test_expired_token_redirects_to_sso(self, mock_random_string):
mock_random_string.return_value = 'examplestring'
client = ClientWithUser()
client.login(username=self.user.username, password='password')
# Set expiration to some time in the past
session = client.session
session['oidc_id_token_expiration'] = time.time() - 100
session.save()
resp = client.get('/mdo_fake_view/')
self.assertEqual(resp.status_code, 302)
url, qs = resp.url.split('?')
self.assertEquals(url, 'http://example.com/authorize')
expected_query = {
'response_type': ['code'],
'redirect_uri': ['http://testserver/callback/'],
'client_id': ['foo'],
'nonce': ['examplestring'],
'prompt': ['none'],
'scope': ['openid email'],
'state': ['examplestring'],
}
self.assertEquals(expected_query, parse_qs(qs))
def get_or_create_token(request):
"""Gets an existing token or creates a new one. If the old token has
expired, create a new one.
:type request: django.http.HttpRequest
"""
if request.account.is_admin():
token, _ = APIToken.objects.get_or_create(
client=request.account, expires__gte=datetime.now(),
defaults={'token': auth_token(),
'expires': datetime.now() + EXPIRE_DELTA})
return HttpResponse(str(token))
else:
return HttpResponse('You must log in to get a token',
status=status.HTTP_403_FORBIDDEN)
def getBaseURL(req):
"""
Given a Django web request object, returns the OpenID 'trust root'
for that request; namely, the absolute URL to the site root which
is serving the Django request. The trust root will include the
proper scheme and authority. It will lack a port if the port is
standard (80, 443).
"""
name = req.META['HTTP_HOST']
try:
name = name[:name.index(':')]
except:
pass
try:
port = int(req.META['SERVER_PORT'])
except:
port = 80
proto = req.META['SERVER_PROTOCOL']
if 'HTTPS' in proto:
proto = 'https'
else:
proto = 'http'
if port in [80, 443] or not port:
port = ''
else:
port = ':%s' % (port,)
url = "%s://%s%s/" % (proto, name, port)
return url
def dispatch(self, request_type, request, **kwargs):
"""
Handles the common operations (allowed HTTP method, authentication,
throttling, method lookup) surrounding most CRUD interactions.
"""
allowed_methods = getattr(self._meta, "%s_allowed_methods" % request_type, None)
if 'HTTP_X_HTTP_METHOD_OVERRIDE' in request.META:
request.method = request.META['HTTP_X_HTTP_METHOD_OVERRIDE']
request_method = self.method_check(request, allowed=allowed_methods)
method = getattr(self, "%s_%s" % (request_method, request_type), None)
if method is None:
raise ImmediateHttpResponse(response=http.HttpNotImplemented())
self.is_authenticated(request)
self.throttle_check(request)
# All clear. Process the request.
request = convert_post_to_put(request)
response = method(request, **kwargs)
# Add the throttled request.
self.log_throttled_access(request)
# If what comes back isn't a ``HttpResponse``, assume that the
# request was accepted and that some action occurred. This also
# prevents Django from freaking out.
if not isinstance(response, HttpResponse):
return http.HttpNoContent()
return response
def method_check(self, request, allowed=None):
"""
Ensures that the HTTP method used on the request is allowed to be
handled by the resource.
Takes an ``allowed`` parameter, which should be a list of lowercase
HTTP methods to check against. Usually, this looks like::
# The most generic lookup.
self.method_check(request, self._meta.allowed_methods)
# A lookup against what's allowed for list-type methods.
self.method_check(request, self._meta.list_allowed_methods)
# A useful check when creating a new endpoint that only handles
# GET.
self.method_check(request, ['get'])
"""
if allowed is None:
allowed = []
request_method = request.method.lower()
allows = ','.join([s.upper() for s in allowed])
if request_method == "options":
response = HttpResponse(allows)
response['Allow'] = allows
raise ImmediateHttpResponse(response=response)
if not request_method in allowed:
response = http.HttpMethodNotAllowed(allows)
response['Allow'] = allows
raise ImmediateHttpResponse(response=response)
return request_method
def throttle_check(self, request):
"""
Handles checking if the user should be throttled.
Mostly a hook, this uses class assigned to ``throttle`` from
``Resource._meta``.
"""
identifier = self._meta.authentication.get_identifier(request)
# Check to see if they should be throttled.
if self._meta.throttle.should_be_throttled(identifier):
# Throttle limit exceeded.
raise ImmediateHttpResponse(response=http.HttpTooManyRequests())
def error_response(self, request, errors, response_class=None):
"""
Extracts the common "which-format/serialize/return-error-response"
cycle.
Should be used as much as possible to return errors.
"""
if response_class is None:
response_class = http.HttpBadRequest
desired_format = None
if request:
if request.GET.get('callback', None) is None:
try:
desired_format = self.determine_format(request)
except BadRequest:
pass # Fall through to default handler below
else:
# JSONP can cause extra breakage.
desired_format = 'application/json'
if not desired_format:
desired_format = self._meta.default_format
try:
serialized = self.serialize(request, errors, desired_format)
except BadRequest, e:
error = "Additional errors occurred, but serialization of those errors failed."
if settings.DEBUG:
error += " %s" % e
return response_class(content=error, content_type='text/plain')
return response_class(content=serialized, content_type=build_content_type(desired_format))
def post_list(self, request, **kwargs):
"""
Creates a new resource/object with the provided data.
Calls ``obj_create`` with the provided data and returns a response
with the new resource's location.
If a new resource is created, return ``HttpCreated`` (201 Created).
If ``Meta.always_return_data = True``, there will be a populated body
of serialized data.
"""
if django.VERSION >= (1, 4):
body = request.body
else:
body = request.raw_post_data
deserialized = self.deserialize(request, body, format=request.META.get('CONTENT_TYPE', 'application/json'))
deserialized = self.alter_deserialized_detail_data(request, deserialized)
bundle = self.build_bundle(data=dict_strip_unicode_keys(deserialized), request=request)
updated_bundle = self.obj_create(bundle, **self.remove_api_resource_names(kwargs))
location = self.get_resource_uri(updated_bundle)
if not self._meta.always_return_data:
return http.HttpCreated(location=location)
else:
updated_bundle = self.full_dehydrate(updated_bundle)
updated_bundle = self.alter_detail_data_to_serialize(request, updated_bundle)
return self.create_response(request, updated_bundle, response_class=http.HttpCreated, location=location)
def post_detail(self, request, **kwargs):
"""
Creates a new subcollection of the resource under a resource.
This is not implemented by default because most people's data models
aren't self-referential.
If a new resource is created, return ``HttpCreated`` (201 Created).
"""
return http.HttpNotImplemented()
def delete_list(self, request, **kwargs):
"""
Destroys a collection of resources/objects.
Calls ``obj_delete_list``.
If the resources are deleted, return ``HttpNoContent`` (204 No Content).
"""
bundle = self.build_bundle(request=request)
self.obj_delete_list(bundle=bundle, request=request, **self.remove_api_resource_names(kwargs))
return http.HttpNoContent()
def __init__(self, sanic_request):
"""
:param SanicRequest sanic_request:
"""
#script_name = get_script_name(environ)
#path_info = get_path_info(environ)
script_name = "/"
path_info = sanic_request.path
if not path_info:
# Sometimes PATH_INFO exists, but is empty (e.g. accessing
# the SCRIPT_NAME URL without a trailing slash). We really need to
# operate as if they'd requested '/'. Not amazingly nice to force
# the path like this, but should be harmless.
path_info = '/'
self.sanic_request = sanic_request
self.path_info = path_info
# be careful to only replace the first slash in the path because of
# http://test/something and http://test//something being different as
# stated in http://www.ietf.org/rfc/rfc2396.txt
self.path = '%s/%s' % (script_name.rstrip('/'),
path_info.replace('/', '', 1))
self.META = {"HTTP_{:s}".format(str(k).upper()): v for (k, v) in sanic_request.headers.items()}
self.META['REMOTE_ADDR'] = sanic_request.ip[0]
self.META['PATH_INFO'] = path_info
self.META['SCRIPT_NAME'] = script_name
self.method = str(sanic_request.method).upper()
# _, content_params = cgi.parse_header(environ.get('CONTENT_TYPE', ''))
# if 'charset' in content_params:
# try:
# codecs.lookup(content_params['charset'])
# except LookupError:
# pass
# else:
# self.encoding = content_params['charset']
self._post_parse_error = False
try:
content_length = int(sanic_request.headers['content-length'])
except (KeyError, ValueError, TypeError):
content_length = 0
#self._stream = LimitedStream(self.environ['wsgi.input'], content_length)
self._body = sanic_request.body
self._read_started = False
self.resolver_match = None
benchmark_api_view.py 文件源码
项目:benchmark-django-rest-framework
作者: hqsh
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def process_response(self, res):
data = res.get(SETTINGS.DATA, None)
if isinstance(res, dict): # dict ? json ??
if SETTINGS.DATA_STYLE == 'dict':
if data is not None:
if isinstance(res[SETTINGS.DATA], (list, dict)) and len(res[SETTINGS.DATA]) == 0:
res[SETTINGS.DATA] = None
elif isinstance(res[SETTINGS.DATA], list):
res[SETTINGS.DATA] = {
SETTINGS.RESULT: res[SETTINGS.DATA],
SETTINGS.COUNT: len(res[SETTINGS.DATA])
}
elif isinstance(res[SETTINGS.DATA].get(SETTINGS.RESULT, None), (list, dict)) and len(res[SETTINGS.DATA][SETTINGS.RESULT]) == 0:
res[SETTINGS.DATA][SETTINGS.RESULT] = None
if data is not None and len(data) > 0:
if self.method == 'get':
path = '/'
if SETTINGS.RESULT in res[SETTINGS.DATA]:
has_result_field = True
else:
has_result_field = False
else:
path = None
has_result_field = None
self.process_keys(res, path, has_result_field)
# additional data
additional_data = getattr(self, 'additional_data', None)
if isinstance(additional_data, dict):
for key, value in additional_data.items():
res[SETTINGS.DATA][self.python_to_java(key, self.omit_underlines)] = value
# process json response class
json_response_class = getattr(SETTINGS, 'JSON_RESPONSE_CLASS', None)
if json_response_class == 'rest_framework.response.Response':
res = Response(res)
elif json_response_class == 'django.http.JsonResponse':
res = JsonResponse(res, json_dumps_params={"indent": 2})
else:
raise Exception('JSON_RESPONSE_CLASS in the benchmark_settings is not defined or not correct. The value of it should be "rest_framework.response.Response", or "django.http.JsonResponse"')
if isinstance(res, (StreamingHttpResponse, django.http.response.HttpResponse)): # ???, ?????? http ??
return res
raise Exception('unknown response type: %s' % type(res))
# ?? style 2 ? get ????
benchmark_api_view.py 文件源码
项目:benchmark-django-rest-framework
作者: hqsh
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def paginate(self, res):
if res[SETTINGS.CODE] == SETTINGS.SUCCESS_CODE and SETTINGS.DATA_STYLE == 'dict':
if isinstance(res[SETTINGS.DATA], dict):
res[SETTINGS.DATA] = [res[SETTINGS.DATA]]
# get one
if self.get_one is None and 'pk' in self.uri_params.keys() or self.get_one:
if len(res[SETTINGS.DATA]) == 0:
res[SETTINGS.DATA] = None
else:
res[SETTINGS.DATA] = res[SETTINGS.DATA][0]
# get many in pages
elif self.page is not None:
if self.limit == 0:
page_count = 0 if self.count == 0 else 1
else:
page_count = math.ceil(self.count / self.limit)
if 1 <= self.page <= page_count:
result = res[SETTINGS.DATA]
else:
result = None
if self.page < 1:
self.page = 0
elif self.page > page_count:
self.page = page_count + 1
basic_url = 'http://' + self.host + self.path
previous_param_url = None
next_param_url = None
params = copy.deepcopy(self.params)
params[SETTINGS.LIMIT] = self.limit
params[SETTINGS.PAGE] = self.page
if self.page <= 1:
previous_url = None
else:
for key, value in params.items():
if key == 'page':
value = str(self.page - 1)
if previous_param_url is None:
previous_param_url = '?' + key + '=' + str(value)
else:
previous_param_url += '&' + key + '=' + str(value)
previous_url = basic_url + previous_param_url
if self.page >= page_count:
next_url = None
else:
for key, value in params.items():
if key == 'page':
value = str(self.page + 1)
if next_param_url is None:
next_param_url = '?' + key + '=' + str(value)
else:
next_param_url += '&' + key + '=' + str(value)
next_url = basic_url + next_param_url
res[SETTINGS.DATA] = {SETTINGS.RESULT: result, SETTINGS.COUNT: self.count,
SETTINGS.NEXT: next_url, SETTINGS.PREVIOUS: previous_url}
# get many not in pages
else:
res[SETTINGS.DATA] = {SETTINGS.RESULT: res[SETTINGS.DATA], SETTINGS.COUNT: len(res[SETTINGS.DATA])}
# ?? get ??