def tickerDataPush():
tickerData = ticker_data()
Group('Ticker').send({
'text': json.dumps(tickerData,cls=DjangoJSONEncoder),
})
python类DjangoJSONEncoder()的实例源码
def kryptos_leader_channel_push(data):
Group('kryptos-leader-channel').send(
{
'text': json.dumps(data,cls=DjangoJSONEncoder)
})
def echo_leader_channel_push(data):
Group('echo-leader-channel').send(
{
'text': json.dumps(data,cls=DjangoJSONEncoder)
})
def userDataPush(userid,data):
Channel( redis_conn.hget("online-users",userid) ).send(
{
'text' : json.dumps(sellData,cls=DjangoJSONEncoder)
}
)
def hashinclude_channel_push(data):
Group('hashinclude-channel').send(
{
'text': json.dumps(data,cls=DjangoJSONEncoder)
})
def test_no_pond_blocks_no_state_changed(self, modify_mock):
pond_blocks = []
now = timezone.now()
mixer.cycle(3).blend(Window, request=(r for r in self.requests), start=now - timedelta(days=2),
end=now + timedelta(days=1))
responses.add(responses.GET, settings.POND_URL + '/pond/pond/blocks/new/',
body=json.dumps(pond_blocks, cls=DjangoJSONEncoder), status=200, content_type='application/json')
one_week_ahead = timezone.now() + timedelta(weeks=1)
response = self.client.get(reverse('api:isDirty') + '?last_query_time=' + parse.quote(one_week_ahead.isoformat()))
response_json = response.json()
self.assertFalse(response_json['isDirty'])
def test_pond_blocks_no_state_changed(self, modify_mock):
now = timezone.now()
mixer.cycle(3).blend(Window, request=(r for r in self.requests), start=now - timedelta(days=2),
end=now + timedelta(days=1))
molecules1 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[0].id,
tracking_num=self.ur.id, event=[])
molecules2 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[1].id,
tracking_num=self.ur.id, event=[])
molecules3 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[2].id,
tracking_num=self.ur.id, event=[])
pond_blocks = basic_mixer.cycle(3).blend(PondBlock, molecules=(m for m in [molecules1, molecules2, molecules3]),
start=now + timedelta(minutes=30), end=now + timedelta(minutes=40))
pond_blocks = [pb._to_dict() for pb in pond_blocks]
responses.add(responses.GET, settings.POND_URL + '/pond/pond/blocks/new/',
body=json.dumps(pond_blocks, cls=DjangoJSONEncoder), status=200, content_type='application/json')
one_week_ahead = timezone.now() + timedelta(weeks=1)
response = self.client.get(reverse('api:isDirty') + '?last_query_time=' + parse.quote(one_week_ahead.isoformat()))
response_json = response.json()
self.assertFalse(response_json['isDirty'])
for i, req in enumerate(self.requests):
req.refresh_from_db()
self.assertEqual(req.state, 'PENDING')
self.ur.refresh_from_db()
self.assertEqual(self.ur.state, 'PENDING')
def test_pond_blocks_state_change_completed(self, modify_mock):
now = timezone.now()
mixer.cycle(3).blend(Window, request=(r for r in self.requests), start=now - timedelta(days=2),
end=now - timedelta(days=1))
molecules1 = basic_mixer.cycle(3).blend(PondMolecule, completed=True, failed=False, request_num=self.requests[0].id,
tracking_num=self.ur.id, event=[])
molecules2 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[1].id,
tracking_num=self.ur.id, event=[])
molecules3 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[2].id,
tracking_num=self.ur.id, event=[])
pond_blocks = basic_mixer.cycle(3).blend(PondBlock, molecules=(m for m in [molecules1, molecules2, molecules3]),
start=now - timedelta(minutes=30), end=now - timedelta(minutes=20))
pond_blocks = [pb._to_dict() for pb in pond_blocks]
responses.add(responses.GET, settings.POND_URL + '/pond/pond/blocks/new/',
body=json.dumps(pond_blocks, cls=DjangoJSONEncoder), status=200, content_type='application/json')
response = self.client.get(reverse('api:isDirty'))
response_json = response.json()
self.assertTrue(response_json['isDirty'])
request_states = ['COMPLETED', 'WINDOW_EXPIRED', 'WINDOW_EXPIRED']
for i, req in enumerate(self.requests):
req.refresh_from_db()
self.assertEqual(req.state, request_states[i])
self.ur.refresh_from_db()
self.assertEqual(self.ur.state, 'COMPLETED')
def get_prep_value(self, value):
return json.dumps(value, cls=DjangoJSONEncoder)
def sse_encode_event(event_type, data, event_id=None, escape=False):
data_str = json.dumps(data, cls=DjangoJSONEncoder)
if escape:
event_type = build_id_escape(event_type)
data_str = build_id_escape(data_str)
out = 'event: %s\n' % event_type
if event_id:
out += 'id: %s\n' % event_id
out += 'data: %s\n\n' % data_str
return out
def get_filter_info(self, request, **kwargs):
self.setup_filters(**kwargs)
search = request.GET.get("search", None)
if search:
self.apply_search(search)
name = request.GET.get("name", None)
table_filter = self.filter_map.get_filter(name)
return json.dumps(table_filter.to_json(self.queryset),
indent=2,
cls=DjangoJSONEncoder)
def get(self, request, *args, **kwargs):
def response(data):
return HttpResponse(json.dumps(data,
indent=2,
cls=DjangoJSONEncoder),
content_type="application/json")
error = "ok"
search_term = request.GET.get("search", None)
if search_term is None:
# We got no search value so return empty reponse
return response({'error': error, 'results': []})
try:
prj = Project.objects.get(pk=kwargs['pid'])
except KeyError:
prj = None
results = self.apply_search(search_term,
prj,
request)[:ToasterTypeAhead.MAX_RESULTS]
if len(results) > 0:
try:
self.validate_fields(results[0])
except self.MissingFieldsException as e:
error = e
data = {'results': results,
'error': error}
return response(data)
def __init__(self, content, status=200, *args, **kwargs):
super(UploadResponse, self).__init__(
content=DjangoJSONEncoder().encode(content),
content_type='application/json',
status=status,
*args, **kwargs
)
def process_form_submission(self, form):
self.get_submission_class().objects.create(
form_data=json.dumps(form.cleaned_data, cls=DjangoJSONEncoder),
page=self, user=form.user
)
def process_form_submission(self, form):
"""
Accepts form instance with submitted data, user and page.
Creates submission instance.
You can override this method if you want to have custom creation logic.
For example, if you want to save reference to a user.
"""
self.get_submission_class().objects.create(
form_data=json.dumps(form.cleaned_data, cls=DjangoJSONEncoder),
page=self,
)
def serialized_facility_factory(identifier):
facility = Facility(name="Facility {}".format(identifier), id=identifier)
return DjangoJSONEncoder().encode(facility.serialize())
def __get__(self, instance, owner):
from restframeworkclient.models import Model
def callable(**kwargs):
def get_value(value):
if isinstance(value, dict):
return json.dumps(value, cls=DjangoJSONEncoder)
if value in (datetime.datetime.now, timezone.now):
return value()
if isinstance(value, Model):
return value.pk
return value
kwargs = {k: get_value(v) for k, v in kwargs.items()}
arg_name = 'data' if self.method == 'POST' else 'params'
if self.static:
url = owner._resources_url() + self.subresource + '/'
data = owner._rest_call(url, method=self.method, **{arg_name: kwargs})
else:
url = instance._resource_url(instance.pk) + self.subresource + '/'
data = instance._rest_call(url, method=self.method, **{arg_name: kwargs})
if self.unwrapping_key:
data = data[self.unwrapping_key]
if self.model:
obj = self.model(**data)
obj._persisted = True
return obj
return data
return callable() if self.as_property else callable
def kolibri_set_server_time():
html = ("<script type='text/javascript'>"
"{0}.utils.serverClock.setServerTime({1});"
"</script>".format(settings.KOLIBRI_CORE_JS_NAME,
json.dumps(now(), cls=DjangoJSONEncoder)))
return mark_safe(html)
def _store_form_kwargs(self, form):
session_data = self.request.session.setdefault(self.wizard_name, {})
# Adjust kwargs to avoid trying to save the instances
form_data = form.cleaned_data.copy()
for prop in ('range', 'benefit', 'condition', 'offer_group'):
obj = form_data.get(prop, None)
if obj is not None:
form_data[prop] = obj.id
form_kwargs = {'data': form_data}
json_data = json.dumps(form_kwargs, cls=DjangoJSONEncoder)
session_data[self._key()] = json_data
self.request.session.save()
def __init__(self, data, encoder=DjangoJSONEncoder, safe=True, **kwargs):
if safe and not isinstance(data, dict):
raise TypeError('In order to allow non-dict objects to be '
'serialized set the safe parameter to False')
kwargs.setdefault('content_type', 'application/json')
data = json.dumps(data, cls=encoder)
super(JsonResponse, self).__init__(content=data, **kwargs)