def form_valid(self, form):
"""
If the form is valid, redirect to the supplied URL.
"""
log.info('received POST to main multiuploader view')
file_obj = self.request.FILES['file']
wrapped_file = UploadedFile(file_obj)
filename = wrapped_file.name
file_size = wrapped_file.file.size
log.info('Got file: "%s"' % filename)
fl = self.model(filename=filename, file=file_obj)
fl.save()
log.info('File saving done')
thumb_url = ""
size = self.request.POST.get('size')
size = size if size else '180x80'
quality = self.request.POST.get('quality')
quality = quality if quality else 50
try:
im = get_thumbnail(fl.file, size, quality=quality)
thumb_url = im.url
except Exception as e:
log.error(e)
# generating json response array
result = {"files": [{"id": str(fl.id),
"name": filename,
"size": file_size,
'type': file_obj.content_type,
"url": reverse('multiuploader', args=[fl.pk]),
"thumbnailUrl": thumb_url,
"deleteUrl": reverse('multiuploader', args=[fl.pk]),
"deleteType": "DELETE", }]
}
return JsonResponse(result)
python类JsonResponse()的实例源码
def sendtest(self, request, campaign_id):
from premailer import Premailer
from django.core.mail import send_mail, BadHeaderError
receiver = request.POST.get('receiver')
if not receiver:
raise Http404
try:
campaign = self.model.objects.get(pk=campaign_id)
except self.model.DoesNotExist:
pass
else:
content = campaign.render_html(request, scheme='http://', test=True)
content = Premailer(content, strip_important=False).transform()
plain = campaign.render_plain(request, test=True)
try:
send_mail(
campaign.subject, plain, settings.DEFAULT_FROM_EMAIL,
recipient_list=[receiver],
html_message=content
)
except BadHeaderError:
pass
response = JsonResponse({})
set_cookie(response, 'last_receiver', receiver)
return response
def submit_view(self, request, object_id):
try:
obj = self.model._default_manager.get(pk=object_id)
except self.model.DoesNotExist:
raise Http404
form = self.get_autopost_form(request, obj)
if form.is_valid():
obj_ct = ContentType.objects.get_for_model(obj)
text = form.cleaned_data.get('text')
networks = form.cleaned_data.get('networks')
for network in networks:
try:
post = FeedPost.objects.get(
network=network,
content_type=obj_ct,
object_id=obj.pk,
)
except FeedPost.DoesNotExist:
FeedPost.objects.create(
network=network,
url=request.build_absolute_uri(self.get_autopost_url(obj)),
text=text,
content_type=obj_ct,
object_id=obj.pk,
)
else:
if post.scheduled:
# ????????? ??????
post.url = request.build_absolute_uri(self.get_autopost_url(obj))
post.text = text
post.save()
return JsonResponse({})
else:
return JsonResponse({
'errors': form.errors
}, status=400)
def json_response(data=None, **kwargs):
if data is None:
data = {}
defaults = {
'encoder': LazyJSONEncoder
}
defaults.update(kwargs)
return JsonResponse(data, **defaults)
def watch(request):
game_ids = request.body.split(',')
result = worker.watch_games(game_ids)
return JsonResponse({'result': result})
def watch_add(request):
game_id = request.body
worker.add_watch(game_id)
return JsonResponse({'ok': True})
def dict_to_response(response, response_format):
if response_format == 'msgpack':
return HttpResponse(msgpack.packb(response))
else:
return JsonResponse(response)
def open_task(request, contest_id, task_id, participant_id):
contest = get_object_or_404(models.TaskBasedContest, pk=contest_id)
task = get_object_or_404(tasks_models.Task, pk=task_id)
if not contest.has_task(task):
return HttpResponseNotFound()
if not is_manual_task_opening_available_in_contest(contest):
messages.error(request, 'Manual task opening is forbidden for this contest')
return redirect(urlresolvers.reverse('contests:task_opens', args=[contest.id, task.id]))
participant = get_object_or_404(models.AbstractParticipant, pk=participant_id)
if participant.contest_id != contest.id:
return HttpResponseNotFound()
qs = tasks_models.ManualOpenedTask.objects.filter(
contest=contest,
task=task,
participant=participant
)
# Toggle opens state: close if it's open, open otherwise
if qs.exists():
qs.delete()
if is_task_open(contest, task, participant):
messages.warning(request, 'Task is opened for this participant not manually, you can\'t close it')
else:
messages.success(request, 'Task is closed for %s' % participant.name)
else:
tasks_models.ManualOpenedTask(
contest=contest,
task=task,
participant=participant
).save()
messages.success(request, 'Task is opened for %s' % participant.name)
return JsonResponse({'done': 'ok'})
def recommend(req):
recs=list(map(lambda x:getattr(x, "articleId"),Recommendation.objects(userId=req.POST["uid"])))
articles=Article.objects(eid__in=recs).exclude("content")
articles=list(map(lambda x:x.__data,articles))
count=0
for word in WordBag.objects(eid__in=recs):
articles[count]["keyWord"]=word.wordList
count+=1
return JsonResponse({
"success":True,
"data":articles
})
def history(req):
aids=Record.getArticleForUser(req.POST["uid"])
articles=list(lambda x:x.__data,Article.objects(eid__in=aids).exclude("content"))
count=0
for word in WordBag.objects(eid__in=aids):
articles[count]["keyWord"]=word.wordList
count+=1
return JsonResponse({
"success":True,
"data":articles
})
def list(req):
pageIndex=req.POST["pageIndex"] if req.POST["pageIndex"] else 0
pageNum=req.POST["pageNum"] if req.POST["pageNum"] else 20
articles=list(map(lambda x:x.__data,Article.objects[pageIndex*pageNum:(pageIndex+1)*pageNum]))
words=WordBag.objects(eid__in=list(map(lambda x:x["eid"],articles)))
for i in range(len(articles)):
articles[i]["keyWord"]=words[i].wordList
return JsonResponse({
"success":True,
"data":articles
})
def detail(req,articleId):
article=Article.objects(eid=articleId).first()
article=article.__data
article["keyWord"]=WordBag.objects(eid=articleId).first().wordList
return JsonResponse({
"success":True,
"data":article
})
def form_invalid(self, form):
response = super(GeoTarget, self).form_invalid(form)
if self.request.is_ajax():
return JsonResponse(form.errors.as_json(), safe=False, status=400)
else:
return response
def form_valid(self, form):
# We make sure to call the parent's form_valid() method because
# it might do some processing (in the case of CreateView, it will
# call form.save() for example).
response = super(GeoTarget, self).form_valid(form)
if self.request.is_ajax():
cons_ids = []
kwargs = {'state_cd': form.cleaned_data['state']}
if form.cleaned_data['primary_only']:
kwargs['is_primary'] = True
# Get all constituent addresses in the state
cons_addrs = ConstituentAddress.objects.filter(**kwargs)
geojson = json.loads(form.cleaned_data['geojson'])
if geojson['type'] == 'FeatureCollection':
# todo: fetch number, but stick to 1st for now
logger.debug('is FeatureCollection')
geojson = geojson['features'][0]['geometry']
# elif geojson['type'] not ['MultiPolygon', 'Polygon']:
poly = GEOSGeometry(json.dumps(geojson))
for con in cons_addrs:
point = Point(y=con.latitude, x=con.longitude)
if poly.contains(point):
cons_ids.append(con.cons_id)
return JsonResponse(cons_ids, safe=False)
else:
return response
def trending(self, request, object_id, **kwargs):
# a custom view that shows trending dogs
if request.method == 'GET':
obj = get_object_or_404(self.model, id=object_id)
# We could filter by dogs specific to this post here, if needed
trending_dogs = Dog.trending.all()[:3]
dogs = DogSerializer(trending_dogs, many=True).data
return JsonResponse({'results': dogs})
else:
return JsonResponse({'results': []})
def post(self, request, *args, **kwargs):
with PyTessBaseAPI() as api:
with Image.open(request.FILES['image']) as image:
sharpened_image = image.filter(ImageFilter.SHARPEN)
api.SetImage(sharpened_image)
utf8_text = api.GetUTF8Text()
return JsonResponse({'utf8_text': utf8_text})
def device_token_receive(request):
if request.method != 'PUT':
return HttpResponse(status=405)
if request.body is b'':
return JsonResponse({'error': 'Bad Request'}, status=400)
query_dict = request.body.decode('utf-8')
body = json.loads(query_dict)
error = False
if 'device_token' not in body:
error = True
if 'uuid' not in body:
error = True
if 'sandbox' not in body:
error = True
if error:
return JsonResponse({'error': 'Bad Request'}, status=400)
device_token = body['device_token']
uuid = body['uuid']
sandbox = body['sandbox']
if DeviceToken.objects.filter(uuid=uuid).count() != 0:
token = DeviceToken.objects.get(uuid=uuid)
token.device_token = device_token
token.use_sandbox = sandbox
token.save()
else:
token = DeviceToken()
token.device_token = device_token
token.uuid = uuid
token.use_sandbox = sandbox
token.save()
return JsonResponse({'result': 'success'}, status=200)
def post(self, request):
tree = request.session.get('tree', None)
node_title = request.data.get('node_title', None)
if tree is None or node_title is None:
return HttpResponseBadRequest()
node = tree.find(node_title)
if node is None:
return HttpResponseBadRequest()
convert_list = {
'title__icontains': 'title',
'code__contains': 'code',
'credit': 'credit',
'category': 'category',
'area': 'area',
'subarea': 'subarea',
'college': 'college',
'dept': 'dept',
}
query_list = {key: request.data.get(value, None) for key, value in convert_list}
# Delete non-parameter keys from query
for key, value in query_list:
if value is None:
del query_list[key]
# TODO: add functions to get these variables
query_list['year'] = '2017'
query_list['semester'] = '1'
retrieved = Course.objects.filter(**query_list)
node.filter_true(retrieved)
retrieved_list = []
for item in retrieved:
course = {value: item.get(value, None) for key, value in convert_list}
retrieved_list.append(course)
json_root = {'data': retrieved_list}
return JsonResponse(json.dumps(json_root))
def test(self, mock_request):
mock_response = JsonResponse({"balance": 250})
mock_request.return_value = mock_response
ussd_client = self.ussd_client()
self.assertEqual(
"Testing response is being saved in "
"session status code is 200 and "
"balance is 250 and full content {'balance': 250}.\n",
ussd_client.send('')
)
expected_calls = [
mock.call(
method='get',
url="http://localhost:8000/mock/balance",
params=dict(
phone_number="200",
session_id=ussd_client.session_id
),
verify=False,
headers={"content-type": "application/json",
"user-agent": "my-app/0.0.1"}
),
mock.call(
method="get",
url="http://localhost:8000/mock/balance/200/"
),
mock.call(
method='post',
url='http://localhost:8000/mock/balance',
params=dict(
phone_numbers=[200, 201, 202],
session_id=ussd_client.session_id
),
verify=True,
timeout=30,
headers={"content-type": "application/json"}
)
]
# test requests that were made
mock_request.assert_has_calls(expected_calls)
def test_retry(self, mock_retry, mock_request):
mock_response = JsonResponse({"balance": 250},
status=400
)
mock_request.return_value = mock_response
ussd_client = self.get_ussd_client()
ussd_client.send('mwas')
# check posted flag has been set to false
self.assertFalse(
ussd_session(ussd_client.session_id)['posted'],
)
self.assertTrue(mock_retry.called)