def exception_handler(exc, context):
# Replace the POST data of the Django request with the parsed
# data from the DRF
# Necessary because we cannot read request data/stream more than once
# This will allow us to see the parsed POST params
# in the rollbar exception log
# Based on https://github.com/tomchristie/django-rest-framework/pull/1671
from rest_framework import views
from django.http.request import QueryDict
query = QueryDict('', mutable=True)
try:
if not isinstance(context['request'].data, dict):
query.update({'_bulk_data': context['request'].data})
else:
query.update(context['request'].data)
context['request']._request.POST = query
except:
pass
return views.exception_handler(exc, context)
python类QueryDict()的实例源码
def create_next_domains_url(path, scheme="http", domain="", query_params={},
clone_domains=()):
""" handy function to create an url with clone domains querystring"""
# Set the correct params (clones and redirect url)
# Small hack to create automatically the url :P
params = QueryDict(mutable=True)
for k, v in query_params.items():
params[k] = v
if len(clone_domains) > 0:
params.setlist('clone-domains', clone_domains)
url = urllib.parse.ParseResult(
scheme=scheme,
netloc=domain,
path=path,
params="",
query=params.urlencode(),
fragment="", )
return url.geturl()
def get_url(self, viewname, arguments=None, query_args=None):
arguments = arguments or {}
query_args = query_args or {}
query_dict = QueryDict(mutable=True)
query_dict.update(**query_args)
return '{url}?{query}'.format(
url=reverse(viewname=viewname, kwargs=arguments),
query=query_dict.urlencode()
)
def process_request(self, request):
if HTTP_APPLICATION_JSON in request.META.get('CONTENT_TYPE', ''):
# load the json data
data = json.loads(request.body)
# for consistency sake, we want to return
# a Django QueryDict and not a plain Dict.
# The primary difference is that the QueryDict stores
# every value in a list and is, by default, immutable.
# The primary issue is making sure that list values are
# properly inserted into the QueryDict. If we simply
# do a q_data.update(data), any list values will be wrapped
# in another list. By iterating through the list and updating
# for each value, we get the expected result of a single list.
q_data = QueryDict('', mutable=True)
for key, value in data.items():
if isinstance(value, list):
# need to iterate through the list and update
# so that the list does not get wrapped in an
# additional list.
for x in value:
q_data.update({key: x})
else:
q_data.update({key: value})
if request.method == 'GET':
request.GET = q_data
if request.method == 'POST':
request.POST = q_data
# default value expected by Django is None
return None
def _parse_prefilled(self, query_params):
result = {}
for param in query_params:
if '__' not in param:
continue
step, field = param.split('__')
if step not in result:
result[step] = QueryDict(mutable=True)
result[step][field] = query_params[param]
return result
def test_county_select_persists_after_session_update(self):
response = self.client.fill_form(
reverse('intake-apply'), counties=['alameda', 'contracosta'])
request = response.wsgi_request
qdict = QueryDict('', mutable=True)
qdict.setlist('hello', ['world'])
utils.save_form_data_to_session(
request, ApplicantFormViewBase.session_key, qdict)
form_data = self.client.session.get(ApplicantFormViewBase.session_key)
self.assertListEqual(['alameda', 'contracosta'], form_data['counties'])
def dict_to_querydict(dict_data):
qdict = QueryDict('', mutable=True)
for key, value in dict_data.items():
if isinstance(value, list):
qdict.setlist(key, value)
else:
qdict[key] = value
return qdict
def get_form_data_from_session(request, session_key):
"""Gets a dictionary from the session based on a key
and converts each key, list pair into a mutable QueryDict
so that it can be processed by a form as if it were post data
"""
raw_dict = request.session.get(session_key, {})
qdict = QueryDict('', mutable=True)
for key, items in raw_dict.items():
if not isinstance(items, list):
items = [items]
qdict.setlist(key, items)
return qdict
def init_request(self):
request = HttpRequest()
request.data = QueryDict().copy()
request.session = DummySession()
return request
def setUpClass(cls):
super(AlertFilterTestCase, cls).setUpClass()
query = QueryDict('content=threat')
cls.alerts = Alert.objects.all()
cls.alert_filter = AlertFilter(query, cls.alerts)
def do(self, action_context, view, email, *args, **kwargs):
# Reset token
user = action_context.extra_context.get('user')
if user:
refresh_password_reset_token(user)
url = reverse('auth:reset-password', kwargs={
'user_id': user.id,
'uuid': str(user.password_reset_token)})
# Prepare the full url (get the domain from the asked
# wiggum entrypoint)
domain = action_context.request.META.get('HTTP_HOST')
scheme = 'http' if settings.DEBUG else "https"
# Get redirection uri
params = QueryDict(mutable=True)
redirect_uri = action_context.extra_context.get("redirect_uri")
if redirect_uri:
params[settings.REDIRECT_URL_VALID_PARAMS[0]] = redirect_uri
url = urllib.parse.ParseResult(scheme=scheme,
netloc=domain,
path=url,
params="",
query=params.urlencode(),
fragment="").geturl()
action_context.extra_context['pass_reset_url'] = url
logger.debug("Password recover url created: {0}".format(url))
return super().do(action_context, view, email, *args, **kwargs)
def test_contains_mac_addresses_field_and_converts_non_querydict(self):
form = DeviceWithMACsForm(data={})
self.assertThat(form.fields, Contains('mac_addresses'))
self.assertIsInstance(form.data, QueryDict)
def test_clone_url(self):
start_host = settings.JWT_COOKIE_CLONE_DOMAINS_ENDPOINT[0]
self.client = Client(SERVER_NAME=start_host, HTTP_HOST=start_host)
# Create a token
params = QueryDict(mutable=True)
params[settings.REDIRECT_URL_VALID_PARAMS[0]] = "http://google.com"
clone_domains = settings.JWT_COOKIE_CLONE_DOMAINS_ENDPOINT
params.setlist('clone-domains', clone_domains)
jwt_token = jwt_utils.create_jwt(self.user)
url_path = reverse('auth:clone-cookie', kwargs={'token': jwt_token})
url = urllib.parse.ParseResult(
scheme="",
netloc="",
path=url_path,
params="",
query=params.urlencode(),
fragment="",
)
resp = self.client.get(url.geturl(), follow=True)
self.assertEqual(200, resp.status_code)
# Check cloning redirects
for k, i in enumerate(resp.redirect_chain[:-1]):
clone_domains = list(settings.JWT_COOKIE_CLONE_DOMAINS_ENDPOINT)
params = QueryDict(mutable=True)
params[settings.REDIRECT_URL_VALID_PARAMS[0]] = "http://google.com"
params.setlist('clone-domains', clone_domains[k+1:])
clone_domains = clone_domains[k:]
if len(clone_domains) > 0:
next_host = clone_domains[0]
else:
next_host = ""
url = urllib.parse.ParseResult(
scheme="http",
netloc=next_host,
path=url_path,
params="",
query=params.urlencode(),
fragment="",
)
# Final redirect (redirect uri)
self.assertEqual(302, resp.redirect_chain[-1][1])
self.assertEqual(params[settings.REDIRECT_URL_VALID_PARAMS[0]],
resp.redirect_chain[-1][0])