def publish(user, item=None, additional_data=None):
if additional_data is None:
additional_data = {}
redis_publisher = RedisPublisher(facility='all', users=[user.email])
r = JSONRenderer()
if item is None:
data = {}
else:
data = item.as_dict()
data.update(additional_data)
data = r.render(data)
message = RedisMessage(data)
if getattr(settings, 'TESTING', False):
# Do not send in tests
return
redis_publisher.publish_message(message)
python类JSONRenderer()的实例源码
def test_publish(self):
with mock.patch("universal_notifications.backends.websockets.RedisMessage") as mocked_message:
# test without extra arguments
publish(self.user)
mocked_message.assert_called_with(JSONRenderer().render({}))
mocked_message.reset_mock()
# test with single item
publish(self.user, self.item)
mocked_message.assert_called_with(JSONRenderer().render(self.item.as_dict()))
mocked_message.reset_mock()
# test with additional_data
additional_data = {"additional": True}
publish(self.user, self.item, additional_data)
result = self.item.as_dict()
result.update(additional_data)
mocked_message.assert_called_with(JSONRenderer().render(result))
def __init__(self, data, **kwargs):
kwargs['status'] = data['status']
del data['status']
request = data['request']
del data['request']
mime = request.META.get('HTTP_ACCEPT')
if mime=='application/json':
kwargs['content_type'] = 'application/json'
data['error'] = data['detail']
del data['detail']
content = JSONRenderer().render(data)
else:
kwargs['content_type'] = 'application/vnd.collection+json'
self.exception = True
renderer_context = {}
renderer_context['request'] = request
renderer_context['view'] = None
renderer_context['response'] = self
content = CollectionJsonRenderer().render(data,
renderer_context=renderer_context)
super(RenderedResponse, self).__init__(content, **kwargs)
def render(self, data, media_type=None, renderer_context=None):
if data.get('results', None) is not None:
return json.dumps({
self.pagination_object_label: data['results'],
self.pagination_count_label: data['count']
})
# If the view throws an error (such as the user can't be authenticated
# or something similar), `data` will contain an `errors` key. We want
# the default JSONRenderer to handle rendering errors, so we need to
# check for this case.
elif data.get('errors', None) is not None:
return super(ConduitJSONRenderer, self).render(data)
else:
return json.dumps({
self.object_label: data
})
def react_documents(context, module, reload_on_success=False):
chapters = Chapter.objects.filter(module=module)
serializer = ChapterSerializer(chapters, many=True)
chapters_json = JSONRenderer().render(serializer.data)
widget = CKEditorUploadingWidget(config_name='image-editor')
widget._set_config()
config = JSONRenderer().render(widget.config)
context = {
'chapters': chapters_json,
'module': module.pk,
'config': config,
'id': 'document-' + str(module.id),
'reload_on_success': json.dumps(reload_on_success),
'ckeditor_media': widget.media
}
return context
def delete(self, request):
"""
Take the user object from the request and call the 'delete' method on it if it exists in the DB.
If this succeeds then we can report a success.
:param request:
:param args:
:param kwargs:
:return: Serialised JSON Response Object to indicate the resource has been created
"""
stdlogger.debug("Hitting HTTP DELETE account view")
user_email = request.user.email
try:
stdlogger.debug("Deleting this user object")
request.user.delete()
except (IntegrityError, InternalError, DataError, DatabaseError):
# The chances of this happening are slim to none! And this line of code should never happen. So we really
# need to tell the other system we are not capable of creating the resource.
raise DatabaseFailureException
context = {'account': user_email, 'deleted': 'success'}
json_context = JSONRenderer().render(context)
return Response(data=json_context, status=status.HTTP_201_CREATED,)
def force_evaluate(val):
s = JSONRenderer().render(val)
return json.loads(s)
def home(request, bd=None):
"""Home view."""
if bd is not None:
bd_object = get_object_or_404(BlockDiagram, id=bd)
bd_data = BlockDiagramSerializer(bd_object).data
bd_serial = JSONRenderer().render(bd_data)
else:
bd_serial = "None"
return render(request, 'home.html', {'bd': bd_serial})
def to_json(data):
return mark_safe(JSONRenderer().render(data).decode('utf-8'))
def test_is_json_serializable(self):
user_agent_string = str(
'Mozilla/5.0 (Linux; Android 6.0.1; Z831 Build/MMB29M) '
'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.83 '
'Mobile Safari/537.36')
request = self.client.get(
reverse('intake-home'), HTTP_USER_AGENT=user_agent_string
).wsgi_request
data = RequestSerializer(request).data
json = JSONRenderer().render(data, renderer_context={'indent': 2})
for chunk in self.json_chunks:
self.assertIn(chunk, json)
def get_response(self, data):
json = JSONRenderer().render(data)
return HttpResponse(json, content_type="application/json")
def kolibri_bootstrap_model(context, base_name, api_resource, **kwargs):
response, kwargs, url_params = _kolibri_bootstrap_helper(context, base_name, api_resource, 'detail', **kwargs)
html = ("<script type='text/javascript'>"
"var model = {0}.resources.{1}.createModel(JSON.parse({2}), {3});"
"model.synced = true;"
"</script>".format(settings.KOLIBRI_CORE_JS_NAME,
api_resource,
json.dumps(JSONRenderer().render(response.data).decode('utf-8')),
json.dumps(url_params)))
return mark_safe(html)
def kolibri_bootstrap_collection(context, base_name, api_resource, **kwargs):
response, kwargs, url_params = _kolibri_bootstrap_helper(context, base_name, api_resource, 'list', **kwargs)
html = ("<script type='text/javascript'>"
"var collection = {0}.resources.{1}.createCollection({2}, {3}, JSON.parse({4}));"
"collection.synced = true;"
"</script>".format(settings.KOLIBRI_CORE_JS_NAME,
api_resource,
json.dumps(url_params),
json.dumps(kwargs),
json.dumps(JSONRenderer().render(response.data).decode('utf-8')),
))
return mark_safe(html)
def post(self, request):
'''
Returns a list of food with calories
'''
if 'foods' not in request.data:
raise Http404
food_querysets = {"foods": []}
for food in request.data.get('foods'):
q = Q(name__icontains=food.upper()) | Q(name__fuzzy=food.upper())
result = Food.objects.filter(q)[:5]
food_querysets['foods'].append(result)
serializer = FoodListSerializer(food_querysets)
json = JSONRenderer().render(serializer.data)
return Response(json)
def __init__(self, data, **kwargs):
content = JSONRenderer().render(data)
kwargs['content_type'] = 'application/json'
super(JSONResponse, self).__init__(content, **kwargs)
def render(self, data, accepted_media_type=None, renderer_context=None):
if renderer_context["response"].status_code != status.HTTP_200_OK:
return JSONRenderer().render(data)
extra = self.get_customizations()
return VersionedOpenAPICodec().encode(data, extra=extra)
def test_dictionary_to_json(self):
serializer = StatusReportSerializer(self.new_status)
content = JSONRenderer().render(serializer.data)
expected_json = JSONRenderer().render(self.expected_dict)
self.assertEquals(expected_json, content)
def test_json_to_StatusReport(self):
json = JSONRenderer().render(self.expected_dict)
stream = BytesIO(json)
data = JSONParser().parse(stream)
serializer = StatusReportSerializer(self.new_status, data=data)
self.assertTrue(serializer.is_valid())
status = serializer.save()
self.assertEqual(self.new_status, status)
self.assertEqual(self.new_status.id, status.id)
self.assertEqual(self.new_status.status, status.status)
self.assertEqual(self.new_status.when, status.when)
self.assertEqual(self.new_status.user, status.user)
def test_json_to_new_StatusReport(self):
json = JSONRenderer().render(self.expected_dict)
stream = BytesIO(json)
data = JSONParser().parse(stream)
serializer = StatusReportSerializer(data=data)
self.assertTrue(serializer.is_valid())
status = serializer.save()
self.assertEqual(self.new_status.status, status.status)
self.assertIsNotNone(status.when)
self.assertEqual(self.new_status.user, status.user)
def __init__(self, data, **kwargs):
content = JSONRenderer().render(data)
kwargs['content_type'] = 'application/json'
super(JSONResponse, self).__init__(content, **kwargs)
def render(self, data):
from rest_framework.renderers import JSONRenderer
renderer = JSONRenderer()
return renderer.render(data)
def __init__(self, data, **kwargs):
content = JSONRenderer().render(data)
kwargs['content_type'] = 'application/json'
super(JSONResponse, self).__init__(content, **kwargs)
# helper functions
def __init__(self, data, **kwargs):
content = JSONRenderer().render(data)
kwargs['content_type'] = 'application/json'
super(JSONResponse, self).__init__(content, **kwargs)
def __init__(self, data, errorcode=200, errormsg="success", **kwargs):
print(kwargs)
composite = {}
if isinstance(data, list):
composite = {"errorcode":errorcode, "errormsg":errormsg, "data":{"result":data}}
else:
composite = {"errorcode":errorcode, "errormsg":errormsg, "data":data}
content = JSONRenderer().render(composite)
kwargs['content_type'] = 'application/json'
super(JSONResponse, self).__init__(content, **kwargs)
def parse_json_response(json):
""" parse the json response """
rendered = JSONRenderer().render(json)
stream = BytesIO(rendered)
return JSONParser().parse(stream)
def render(self, data, accepted_media_type=None, renderer_context=None):
if renderer_context['response'].status_code != status.HTTP_200_OK:
return JSONRenderer().render(data)
extra = self.get_customizations()
swagger_doc = json.loads(OpenAPICodec().encode(data, extra=extra))
result = append_schemas(swagger_doc) # Append more information to documentation
return result
def add_to_queue(self, obj):
if getattr(settings, 'UNIVERSAL_NOTIFICATIONS_TWILIO_ENABLE_PROXY', False):
connection = StrictRedis(**private_settings.WS4REDIS_CONNECTION)
r = JSONRenderer()
json_data = r.render({'number': obj.from_phone})
channel = getattr(settings, 'UNIVERSAL_NOTIFICATIONS_TWILIO_DISPATCHER_CHANNEL', '__un_twilio_dispatcher')
connection.publish(channel, RedisMessage(json_data))
else:
self.send(obj.message)
obj.message.save()
check_twilio_proxy.py 文件源码
项目:universal_notifications
作者: ArabellaTech
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def handle(self, *args, **options):
connection = StrictRedis(**private_settings.WS4REDIS_CONNECTION)
numbers = PhonePendingMessages.objects.all().values_list("from_phone", flat=True).distinct()
for n in numbers:
r = JSONRenderer()
json_data = r.render({"number": n})
channel = getattr(settings, "UNIVERSAL_NOTIFICATIONS_TWILIO_DISPATCHER_CHANNEL", "__un_twilio_dispatcher")
connection.publish(channel, RedisMessage(json_data))
rest_framework_swagger_renderers.py 文件源码
项目:drf-swagger-missing
作者: tovmeod
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def render(self, data, accepted_media_type=None, renderer_context=None):
"""This is almost the same as parent, only here I use my own OpenAPICodec, add definitions and base path"""
if renderer_context['response'].status_code != status.HTTP_200_OK:
return JSONRenderer().render(data)
extra = self.get_customizations()
extra['definitions'] = encode_schemas(data._definitions)
# if the base path auto detection doesn't work well for you it may be overridden
extra['basePath'] = getattr(settings, 'API_BASE_PATH', data._base_path)
return BetterOpenAPICodec().encode(data, extra=extra)
def render(self, data, accepted_media_type=None, renderer_context=None):
if renderer_context['response'].status_code != status.HTTP_200_OK:
return JSONRenderer().render(data)
extra = self.get_customizations()
return OpenAPICodec().encode(data, extra=extra)