def nodeinfo_view(request):
"""Generate a NodeInfo document."""
site = Site.objects.get_current()
usage = {"users": {}}
if settings.SOCIALHOME_STATISTICS:
usage = {
"users": {
"total": User.objects.count(),
"activeHalfyear": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=180)).count(),
"activeMonth": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=30)).count(),
},
"localPosts": Content.objects.filter(author__user__isnull=False, content_type=ContentType.CONTENT).count(),
"localComments": Content.objects.filter(author__user__isnull=False, content_type=ContentType.REPLY).count(),
}
nodeinfo = NodeInfo(
software={"name": "socialhome", "version": version},
protocols={"inbound": ["diaspora"], "outbound": ["diaspora"]},
services={"inbound": [], "outbound": []},
open_registrations=settings.ACCOUNT_ALLOW_REGISTRATION,
usage=usage,
metadata={"nodeName": site.name}
)
return JsonResponse(nodeinfo.doc)
python类JsonResponse()的实例源码
def add_product_to_wishlist(request, wishlist_id, product_id):
response = {'created': False}
if request.method != 'POST':
return JsonResponse({'err': 'invalid request'}, status=405)
if not getattr(request, 'customer', None):
return JsonResponse({'err': 'unauthorized request'}, status=403)
if wishlist_id != 'default':
wishlist = Wishlist.objects.filter(customer=request.customer, id=int(wishlist_id)).first()
else:
wishlist = Wishlist.objects.filter(customer=request.customer, shop=request.shop).first()
if wishlist:
created = not wishlist.products.filter(id=product_id).exists()
response['created'] = created
if created:
wishlist.products.add(product_id)
response['product_name'] = wishlist.products.get(id=product_id).name
elif wishlist_id == 'default':
return JsonResponse({'err': 'no wishlists exist'}, status=200)
else:
return JsonResponse({'err': 'invalid wishlist'}, status=400)
return JsonResponse(response)
def business_area_handler(request, pk):
if request.method == 'POST':
data = json.loads(request.body)
if pk is None:
# New business area
BusinessArea.objects.create(name=data['name'])
return JsonResponse(ReturnStatus(True, 'OK').to_dict())
else:
# existing business area update
try:
ba = BusinessArea.objects.get(pk=pk)
ba.name = data['name']
ba.save()
except BusinessArea.DoesNotExist:
return JsonResponse(ReturnStatus(False, 'Key does not exist').to_dict())
return JsonResponse(ReturnStatus(True, 'OK').to_dict())
elif request.method == 'DELETE':
try:
ba = BusinessArea.objects.get(pk=pk)
if ba.mission_set.all().count() != 0:
return JsonResponse(ReturnStatus(False, 'Business Areas can not be deleted while missions are still associated with them.').to_dict())
ba.delete()
except BusinessArea.DoesNotExist:
return JsonResponse(ReturnStatus(False, 'Key does not exist').to_dict())
return JsonResponse(ReturnStatus(True, 'OK').to_dict())
def get(self, request, *args, **kwargs):
_page = get_object_or_404(Page, pk=request.GET.get('page_id'))
# release page for all users which did not update during the last timeframe
PageIndicator.objects.filter(edited_on__lte=self.time_past, page=_page).delete()
page_indicators = PageIndicator.objects.filter(edited_on__gte=self.time_past, page=_page)\
.exclude(editor=request.user)
response = {}
if page_indicators.exists():
response['conflict'] = True
editing_user = page_indicators.first()
response['concurrent_user'] = [editing_user.editor.username]
# default behavior for concurrent users is blocking
response['block_editing'] = getattr(settings, 'CONCURRENT_BLOCK_EDITING', BLOCK_EDITING_DEFAULT)
response['message'] = _(u'Unfortunately you cannot edit this page at the moment: '
u'user {user} is blocking it since {time}').format(
user=editing_user.editor,
time=_date(editing_user.started_editing, 'D, d. b, H:i'))
else:
response['conflict'] = False
return JsonResponse(json.dumps(response), safe=False)
def _run_test_actions(name, actions):
test = ConversationTest(name, actions)
start_time = time.time()
report = None
try:
report = test.run()
except Exception as e:
log = TestLog.get()
fatal = not isinstance(e, ConversationTestException)
if fatal:
trace = traceback.format_exc()
print(trace)
log.append(trace)
return JsonResponse(data={'status': 'exception' if fatal else 'failed', 'log':log, 'message':str(e), 'report':report})
elapsed_time = time.time() - start_time
return JsonResponse(data={'status': 'passed', 'log':TestLog.get(), 'duration':elapsed_time, 'report':report})
def root(self, _):
"""
Implements the
`root <http://docs.annotatorjs.org/en/v1.2.x/storage.html#root>`_
endpoint.
:param _:
:class:`rest_framework.request.Request` object—ignored here.
:return:
API information :class:`rest_framework.response.Response`.
"""
return JsonResponse(
{
"name": getattr(settings,
"ANNOTATOR_NAME",
"django-annotator-store"),
"version": annotator.__version__
})
def test_uploading_image(self, mock_get_thumbnail):
"""Test upload single image file"""
mock_get_thumbnail.return_value = ""
with open(settings.TEST_DATA_DIR+'/test.png', 'rb') as att:
form_data = {'file': SimpleUploadedFile(att.name, att.read()), 'media_type': 'images'}
resp = self.client.post(reverse('multiuploader'), data=form_data)
self.assertEqual(resp.status_code, 200)
data = {"files": [{"id": "1",
"name": "test.png",
"size": 180,
"type": "text/plain",
"url": "/multiuploader/1/",
"thumbnailUrl": "",
"deleteUrl": "/multiuploader/1/",
"deleteType": "DELETE"
}]
}
self.assertEqual(resp.content, JsonResponse(data).content)
resp = self.client.get(reverse('multiuploader', args=[1]), HTTP_USER_AGENT='Mozilla/5.0')
self.assertEqual(resp.status_code, 200)
resp = self.client.delete(reverse('multiuploader', args=[1]))
self.assertEqual(resp.status_code, 200)
def test_uploading_mp3(self, mock_get_thumbnail):
"""Test upload single mp3 file"""
mock_get_thumbnail.return_value = ""
with open(settings.TEST_DATA_DIR + '/test.mp3', 'rb') as att:
form_data = {'file': SimpleUploadedFile(att.name, att.read()), 'media_type': 'audio'}
resp = self.client.post(reverse('multiuploader'), data=form_data)
self.assertEqual(resp.status_code, 200)
data = {"files": [{"id": "2",
"name": "test.mp3",
"size": 3742720,
'type': "text/plain",
"url": "/multiuploader/2/",
"thumbnailUrl": "",
"deleteUrl": "/multiuploader/2/",
"deleteType": "DELETE", }]
}
self.assertEqual(resp.content, JsonResponse(data).content)
resp = self.client.get(reverse('multiuploader', args=[2]), HTTP_USER_AGENT='Mozilla/5.0')
self.assertEqual(resp.status_code, 200)
resp = self.client.delete(reverse('multiuploader', args=[2]))
self.assertEqual(resp.status_code, 200)
def test_uploading_pdf(self, mock_get_thumbnail):
"""Test upload single pdf file"""
mock_get_thumbnail.return_value = ""
with open(settings.TEST_DATA_DIR + '/test.pdf', 'rb') as att:
form_data = {'file': SimpleUploadedFile(att.name, att.read()), 'media_type': 'default'}
resp = self.client.post(reverse('multiuploader'), data=form_data)
self.assertEqual(resp.status_code, 200)
data = {"files": [{"id": "3",
"name": "test.pdf",
"size": 6763,
'type': "text/plain",
"url": "/multiuploader/3/",
"thumbnailUrl": "",
"deleteUrl": "/multiuploader/3/",
"deleteType": "DELETE", }]
}
self.assertEqual(resp.content, JsonResponse(data).content)
resp = self.client.get(reverse('multiuploader', args=[3]), HTTP_USER_AGENT='Mozilla/5.0')
self.assertEqual(resp.status_code, 200)
resp = self.client.delete(reverse('multiuploader', args=[3]))
self.assertEqual(resp.status_code, 200)
def test_uploading_thumbnail_quality(self, mock_get_thumbnail):
"""Test upload single image file with quality"""
mock_get_thumbnail.return_value = ""
with open(settings.TEST_DATA_DIR + '/test.png', 'rb') as att:
form_data = {'file': SimpleUploadedFile(att.name, att.read()), 'media_type': 'images', 'quality': 70}
resp = self.client.post(reverse('multiuploader'), data=form_data)
self.assertEqual(resp.status_code, 200)
data = {"files": [{"id": "4",
"name": "test.png",
"size": 180,
'type': "text/plain",
"url": "/multiuploader/4/",
"thumbnailUrl": "",
"deleteUrl": "/multiuploader/4/",
"deleteType": "DELETE", }]
}
self.assertEqual(resp.content, JsonResponse(data).content)
resp = self.client.get(reverse('multiuploader', args=[4]), HTTP_USER_AGENT='Mozilla/5.0')
self.assertEqual(resp.status_code, 200)
resp = self.client.delete(reverse('multiuploader', args=[4]))
self.assertEqual(resp.status_code, 200)
def test_uploading_thumbnail_size(self, mock_get_thumbnail):
"""Test upload single image file with size"""
mock_get_thumbnail.return_value = ""
with open(settings.TEST_DATA_DIR+'/test.png', 'rb') as att:
form_data = {'file': SimpleUploadedFile(att.name, att.read()), 'media_type': 'images', 'size': '280x80'}
resp = self.client.post(reverse('multiuploader'), data=form_data)
self.assertEqual(resp.status_code, 200)
data = {"files": [{"id": "5",
"name": "test.png",
"size": 180,
"type": "text/plain",
"url": "/multiuploader/5/",
"thumbnailUrl": "",
"deleteUrl": "/multiuploader/5/",
"deleteType": "DELETE"
}]
}
self.assertEqual(resp.content, JsonResponse(data).content)
resp = self.client.get(reverse('multiuploader', args=[5]), HTTP_USER_AGENT='Mozilla/5.0')
self.assertEqual(resp.status_code, 200)
resp = self.client.delete(reverse('multiuploader', args=[5]))
self.assertEqual(resp.status_code, 200)
def view(self):
league_tag = self.request.GET.get('league')
if league_tag == 'all':
league = None
elif league_tag is not None:
league = League.objects.filter(tag=league_tag).first()
else:
league = self.league
try:
board = int(self.request.GET.get('board', ''))
except ValueError:
board = None
try:
team = int(self.request.GET.get('team', ''))
except ValueError:
team = None
return JsonResponse(_tv_json(league, board, team))
def test_async_workflow(self, mock_request, mock_http_task):
mock_response = JsonResponse({"balance": 257})
mock_request.return_value = mock_response
ussd_client = self.ussd_client()
ussd_client.send('')
# check http_task is called
mock_http_task.delay.assert_called_once_with(
request_conf=dict(
method='get',
url="https://localhost:8000/mock/submission",
params={'phone_number': '200',
'session_id': ussd_client.session_id}
)
)
def post(self, request):
errors = self._validate_request(request)
if errors:
return JsonResponse(dict(errors=errors), status=400)
file_name = request.FILES['file']. name
callback = request.GET['callback']
result = run_abacus.delay(
file_name, callback,
*self._prepare_output_file(file_name)
)
return JsonResponse({
'task_id': result.task_id,
'status': request.build_absolute_uri(
reverse('query', args=(result.task_id,))
)
})
def search(request):
if not request.method == 'POST':
raise Http404
req = json.loads(request.body)
tmp = Application.objects.search_text(req['keyword'])
apkList = map(lambda x:{
"name":x.name,
"version":x.version,
"md5":x.md5,
"modifyDate":x.modifyDate,
"publicationDate":x.publicationDate,
"id":str(x.id)
}, tmp)
return JsonResponse({
"success":True,
"data":apkList,
"msg":''
})
def process_request(self,request):
url=request.get_full_path()
if not url in ['/report/basic','/report/analysis']:
return None
apkId=request.session.get('apkId')
if not apkId:
if request.method=='POST':
return JsonResponse({
"success":False,
"data":None,
"msg":'????????'
})
else:
raise Http404
else:
return None
def get_ss_qr(request):
if request.user.is_anonymous():
return JsonResponse({'error': 'unauthorized'})
if not hasattr(request.user, 'ss_user'):
return JsonResponse({'error': 'no linked shadowsocks account'})
ss_user = request.user.ss_user
if request.GET.get('nid'):
try:
node = Node.objects.get(pk=request.GET.get('nid'))
except Node.DoesNotExist:
return JsonResponse({'error': 'node not exist'})
else:
node = Node.objects.all().order_by('-weight')
if node:
node = node[0]
else:
return JsonResponse({'error': 'no node at all'})
password = '{}:{}@{}:{}'.format(node.method, ss_user.password, node.server, ss_user.port)
img = qrcode.make('ss://{}'.format(base64.b64encode(bytes(password, 'utf8')).decode('ascii')))
response = HttpResponse(content_type="image/png")
img.save(response)
return response
def render_to_response(self, context, **response_kwargs):
try:
scope_name = self.kwargs['scope']
range_start = self.request.GET.get('start', None) or None
range_end = self.request.GET.get('end', None) or None
if scope_name == "custom":
if range_start is None:
return HttpResponseBadRequest()
range_start = int(range_start)
if range_end is not None:
range_end = int(range_end)
data, start, end, resolution = D3GraphDataGenerator(self.node, self.object, scope_name, range_start, range_end).generate()
return JsonResponse(data)
except:
logger.exception("Error rendering graph data for %s on %s", self.object, self.node)
raise
def nodeinfo_well_known_view(request):
"""Generate .well-known/nodeinfo."""
wellknown = get_nodeinfo_well_known_document(settings.SOCIALHOME_URL)
return JsonResponse(wellknown)
def social_relay_view(request):
"""Generate a .well-known/x-social-relay document."""
relay = SocialRelayWellKnown(subscribe=True)
return JsonResponse(relay.doc)
def form_invalid(self, form):
return JsonResponse({'success': False})
def form_valid(self, form):
form_data = form.save(commit=False)
form_data.uploader = self.request.user
form_data.save()
return JsonResponse({'success': True, 'imgURL': form_data.image.url})
def form_valid(self, form):
instance = form.save()
ctx = {'success': True, 'msg': "Comment was created successfully."}
ctx['data'] = self.comment_dict(instance)
return JsonResponse(ctx)
def form_invalid(self, form):
ctx = {'success': False, 'msg': "Failed to post a comment",
'errors': json.loads(form.errors.as_json())}
return JsonResponse(ctx)
def form_valid(self, form):
shop = self.request.shop
customer = self.request.customer
product_id = self.request.POST.get('product_id')
response = {}
wishlist, created = Wishlist.objects.get_or_create(shop=shop, customer=customer, **form.cleaned_data)
response['id'] = wishlist.id
response['name'] = wishlist.name
if created and product_id:
wishlist.products.add(product_id)
response["product_id"] = product_id
response['product_name'] = wishlist.products.get(pk=product_id).name
response['created'] = created
return JsonResponse(response)
def form_invalid(self, form):
return JsonResponse(form.errors, status=400)
def done(self, form_list, **kwargs):
"""
Wizard finish handler.
The idea is to persist your data here
"""
return JsonResponse({'next_url': '/next-page/'})
def _get_response_data(self, response):
assert isinstance(response, JsonResponse)
return json.loads(response.content.decode('utf-8'))
def render_state(self, step=None, form=None, form_data=None, form_files=None, done=False, status_code=200):
valid = self.is_valid()
current_step = self.get_current_step(step=step)
data = {
'current_step': current_step if not done else None,
'done': done,
'valid': valid,
'structure': self.get_structure(),
'steps': {}
}
for step in self.steps.all:
current_form = None
current_form_data = None
current_form_files = None
if form is not None and step == current_step:
current_form = form
current_form_data = form_data
current_form_files = form_files
data['steps'][step] = self.get_step_data(
step=step, form=current_form, form_data=current_form_data, form_files=current_form_files)
# Allow for manipulating state data before returning
data = self.clean_state_data(data)
return JsonResponse(data, status=status_code, encoder=self.json_encoder_class)
def render_response(self, data=None, status_code=200):
data = data or {}
return JsonResponse(data, status=status_code, encoder=self.json_encoder_class)