def portfolioDataPush():
for userid in redis_conn.hkeys("online-users"):
userid = userid.decode("utf-8")
try:
portfolioData = portfolio(userid)
Channel( redis_conn.hget("online-users",userid)).send(
{
'text': json.dumps(portfolioData,cls=DjangoJSONEncoder)
})
except:
print("Error in portfolioPush")
#======================= SELL CHANNEL ==========================#
python类DjangoJSONEncoder()的实例源码
def history(request):
username = auth.get_user(request).username
if (username):
?t = city.objects.all().values()
co = country.objects.all().values()
city_json = json.dumps(list(?t), cls=DjangoJSONEncoder,ensure_ascii=False)
country_json = json.dumps(list(co), cls=DjangoJSONEncoder,ensure_ascii=False)
args={}
args['city']=city_json
args['country'] = country_json
args['max_date'] = []
for i in ?t:
args['max_date'].append((temperature.objects.filter(city_id__exact=i['city_id']).latest('date').date))
return render_to_response("history.html",args)
else:
return redirect("/login")
def forecast(request):
username = auth.get_user(request).username
if (username):
?t = city.objects.all().values()
co = country.objects.all().values()
city_json = json.dumps(list(?t), cls=DjangoJSONEncoder,ensure_ascii=False)
country_json = json.dumps(list(co), cls=DjangoJSONEncoder,ensure_ascii=False)
args={}
args['city']=city_json
args['country'] = country_json
args['max_date']=[]
for i in ?t:
args['max_date'].append((temperature.objects.filter(city_id__exact = i['city_id']).latest('date').date))
#args['max_date'] = (temperature.objects.filter(city_id__exact=city).latest('date').date)
return render_to_response("forecast.html",args)
else:
return redirect("/login")
def messages(request, room_id):
if request.method == 'POST':
try:
room = ChatRoom.objects.get(eid=room_id)
except ChatRoom.DoesNotExist:
try:
room = ChatRoom(eid=room_id)
room.save()
except IntegrityError:
# someone else made the room. no problem
room = ChatRoom.objects.get(eid=room_id)
mfrom = request.POST['from']
text = request.POST['text']
with transaction.atomic():
msg = ChatMessage(room=room, user=mfrom, text=text)
msg.save()
send_event('room-%s' % room_id, 'message', msg.to_data())
body = json.dumps(msg.to_data(), cls=DjangoJSONEncoder)
return HttpResponse(body, content_type='application/json')
else:
return HttpResponseNotAllowed(['POST'])
def append_event(self, channel, event_type, data):
from . import models
db_event = models.Event(
channel=channel,
type=event_type,
data=json.dumps(data, cls=DjangoJSONEncoder))
db_event.save()
self.trim_event_log()
e = Event(
db_event.channel,
db_event.type,
data,
id=db_event.eid)
return e
def test_jsonrpc_invalid_request_1(live_server):
# Missing 'method' in payload
headers = {'content-type': 'application/json'}
payload = {
# "method": 'add',
"params": [5, 6],
"jsonrpc": "2.0",
"id": random.randint(1, 1000),
}
req_data = json.dumps(payload, cls=DjangoJSONEncoder)
response = requests.post(live_server.url + '/all-rpc/', data=req_data, headers=headers).json()
assert 'Missing parameter "method"' in response['error']['message']
assert RPC_INVALID_REQUEST == response['error']['code']
def test_jsonrpc_invalid_request_2(live_server):
# Missing 'jsonrpc' in payload
headers = {'content-type': 'application/json'}
payload = {
"method": 'add',
"params": [5, 6],
# "jsonrpc": "2.0",
"id": random.randint(1, 1000),
}
req_data = json.dumps(payload, cls=DjangoJSONEncoder)
response = requests.post(live_server.url + '/all-rpc/', data=req_data, headers=headers).json()
assert 'Missing parameter "jsonrpc"' in response['error']['message']
assert RPC_INVALID_REQUEST == response['error']['code']
def test_jsonrpc_invalid_request_3(live_server):
# Bad value for payload member 'jsonrpc'
headers = {'content-type': 'application/json'}
payload = {
"method": 'add',
"params": [5, 6],
"jsonrpc": "1.0",
"id": random.randint(1, 1000),
}
req_data = json.dumps(payload, cls=DjangoJSONEncoder)
response = requests.post(live_server.url + '/all-rpc/', data=req_data, headers=headers).json()
assert 'The attribute "jsonrpc" must contain "2.0"' in response['error']['message']
assert RPC_INVALID_REQUEST == response['error']['code']
def serialize_instance(instance):
"""
Since Django 1.6 items added to the session are no longer pickled,
but JSON encoded by default. We are storing partially complete models
in the session (user, account, token, ...). We cannot use standard
Django serialization, as these are models are not "complete" yet.
Serialization will start complaining about missing relations et al.
"""
data = {}
for k, v in instance.__dict__.items():
if k.startswith('_') or callable(v):
continue
try:
if isinstance(instance._meta.get_field(k), BinaryField):
v = force_text(base64.b64encode(v))
except FieldDoesNotExist:
pass
data[k] = v
return json.loads(json.dumps(data, cls=DjangoJSONEncoder))
def common_well_search(request):
"""
Returns json and array data for a well search. Used by both the map and text search.
"""
well_results = None
well_results_json = '[]'
form = SearchForm(request.GET)
if form.is_valid():
well_results = form.process()
if well_results and not len(well_results) > SearchForm.WELL_RESULTS_LIMIT:
well_results_json = json.dumps(
[well.as_dict() for well in well_results],
cls=DjangoJSONEncoder)
return form, well_results, well_results_json
def __init__(self, *args, **kwargs):
"""
:param dump_kwargs: Keyword arguments which will be passed to `json.dumps()`
:param load_kwargs: Keyword arguments which will be passed to `json.loads()`
"""
self.dump_kwargs = kwargs.pop('dump_kwargs', {
'cls': DjangoJSONEncoder,
'ensure_ascii': False,
'sort_keys': False,
'separators': (',', ':')
})
self.load_kwargs = kwargs.pop('load_kwargs', {
'object_pairs_hook': OrderedDict
})
if not kwargs.get('null', False):
kwargs['default'] = kwargs.get('default', defaultdict(OrderedDict))
super(JSONField, self).__init__(*args, **kwargs)
def _preencode(data, magic_marker, in_coords=False, in_groups=False):
if isinstance(data, dict):
data = data.copy()
for name, value in tuple(data.items()):
if name in ('bounds', 'point', 'locations') and isinstance(value, (tuple, list)):
data[name] = magic_marker+json.dumps(value, cls=DjangoJSONEncoder)+magic_marker
else:
data[name] = _preencode(value, magic_marker,
in_coords=(name == 'coordinates'), in_groups=(name == 'groups'))
return data
elif isinstance(data, (tuple, list)):
if (in_coords and data and isinstance(data[0], (int, float))) or in_groups:
return magic_marker+json.dumps(data, cls=DjangoJSONEncoder)+magic_marker
else:
return tuple(_preencode(value, magic_marker, in_coords) for value in data)
else:
return data
def export_action(self, request):
response = HttpResponse(content_type='application/json')
response['Content-Disposition'] = 'attachment; filename=templatemodel_export.json'
items = [{
'name': t.name,
'layout': t.layout,
'subject': t.subject,
'body': t.body,
'version': t.version,
'enabled': t.enabled,
'has_errors': t.has_errors,
'created_at': t.created_at,
'updated_at': t.updated_at,
'history': [{
'layout': h.layout,
'subject': h.subject,
'body': h.body,
'version': h.version,
'archived_at': h.archived_at,
} for h in t.history.all()]
} for t in TemplateModel.objects.all()]
json.dump(items, response, cls=DjangoJSONEncoder, indent=2, sort_keys=True)
return response
def calc_frequency(areaNum, word_counts, curr_timeslot_word_counts):
timeslot_area_percentage = {}
for i in word_counts:
for j in curr_timeslot_word_counts:
#on same word (key) calculate actual percent
if i == j:
timeslot_area_percentage[i] = (word_counts[i]/(curr_timeslot_word_counts[j] + 0.00)) * 100
timeslot_area_percentage[i] = round(timeslot_area_percentage[i], 2)
return timeslot_area_percentage
# if areaNum == 1:
# return {"area1" : timeslot_area_percentage} # TODO: convert to django friendly using below??
# elif areaNum == 2:
# return {"area2" : timeslot_area_percentage}
# elif areaNum == 3:
# return {"area3" : timeslot_area_percentage}
#convert site percents to django friendly JSON
# site1_percentage_json = json.dumps(dict(site1_percentage), cls=DjangoJSONEncoder)
# site2_percentage_json = json.dumps(dict(site2_percentage), cls=DjangoJSONEncoder)
# takes a url as a string and returns a SET of TUPLES of all of the words
# that are used on that webpage in order of frequency
def get(self, request, nnid):
"""
Common Network Info Get Method
---
# Class Name : CommonNNInfoList
# Description:
Structure : <nninfo> - version - batch version
Search deinfed list of neural networks
"""
try:
condition = {}
condition['nn_id'] = nnid
if str(nnid).lower() == 'all':
condition['nn_id'] = '%'
elif str(nnid).lower() == 'seq':
condition['nn_id'] = 'seq'
return_data = NNCommonManager().get_nn_info(condition)
logging.info(return_data)
return Response(json.dumps(return_data, cls=DjangoJSONEncoder))
except Exception as e:
return_data = {"status": "404", "result": str(e)}
return Response(json.dumps(return_data, cls=DjangoJSONEncoder))
def test_serialize_and_deserialize_1():
transcoder = UnifiedTranscoder(json_encoder_cls=DjangoJSONEncoder)
# test serialize
created = SampleAggregate.Created(entity_id='b089a0a6-e0b3-480d-9382-c47f99103b3d', attr1='val1', attr2='val2',
metadata={'command_id': 123})
created_stored_event = transcoder.serialize(created)
assert created_stored_event.event_type == 'sample_aggregate_created'
assert created_stored_event.event_version == 1
assert created_stored_event.event_data == '{"attr1":"val1","attr2":"val2"}'
assert created_stored_event.aggregate_id == 'b089a0a6-e0b3-480d-9382-c47f99103b3d'
assert created_stored_event.aggregate_version == 0
assert created_stored_event.aggregate_type == 'SampleAggregate'
assert created_stored_event.module_name == 'djangoevents.tests.test_unifiedtranscoder'
assert created_stored_event.class_name == 'SampleAggregate.Created'
assert created_stored_event.metadata == '{"command_id":123}'
# test deserialize
created_copy = transcoder.deserialize(created_stored_event)
assert 'metadata' not in created_copy.__dict__
created.__dict__.pop('metadata') # metadata is not included in deserialization
assert created.__dict__ == created_copy.__dict__
def test_serialize_and_deserialize_2():
transcoder = UnifiedTranscoder(json_encoder_cls=DjangoJSONEncoder)
# test serialize
updated = SampleAggregate.Updated(entity_id='b089a0a6-e0b3-480d-9382-c47f99103b3d', entity_version=10, attr1='val1',
attr2='val2', metadata={'command_id': 123})
updated_stored_event = transcoder.serialize(updated)
assert updated_stored_event.event_type == 'overridden_event_type'
assert updated_stored_event.event_version == 1
assert updated_stored_event.event_data == '{"attr1":"val1","attr2":"val2"}'
assert updated_stored_event.aggregate_id == 'b089a0a6-e0b3-480d-9382-c47f99103b3d'
assert updated_stored_event.aggregate_version == 10
assert updated_stored_event.aggregate_type == 'SampleAggregate'
assert updated_stored_event.module_name == 'djangoevents.tests.test_unifiedtranscoder'
assert updated_stored_event.class_name == 'SampleAggregate.Updated'
assert updated_stored_event.metadata == '{"command_id":123}'
# test deserialize
updated_copy = transcoder.deserialize(updated_stored_event)
assert 'metadata' not in updated_copy.__dict__
updated.__dict__.pop('metadata') # metadata is not included in deserialization
assert updated.__dict__ == updated_copy.__dict__
def test_transcoder_passes_all_attributes_to_event_constructor():
attributes = {
'entity_id': 'b089a0a6-e0b3-480d-9382-c47f99103b3d',
'foo': 0,
'bar': 1,
}
event = SampleAggregate.Created(**attributes)
transcoder = UnifiedTranscoder(json_encoder_cls=DjangoJSONEncoder)
serialized_event = transcoder.serialize(event)
init_call_args = None
def init(self, *args, **kwargs):
nonlocal init_call_args
init_call_args = (args, kwargs)
with mock.patch.object(SampleAggregate.Created, '__init__', init):
transcoder.deserialize(serialized_event)
args, kwargs = init_call_args
assert args == tuple()
for key, value in attributes.items():
assert key in kwargs
assert kwargs[key] == value
def serialize_instance(instance):
"""
Since Django 1.6 items added to the session are no longer pickled,
but JSON encoded by default. We are storing partially complete models
in the session (user, account, token, ...). We cannot use standard
Django serialization, as these are models are not "complete" yet.
Serialization will start complaining about missing relations et al.
"""
data = {}
for k, v in instance.__dict__.items():
if k.startswith('_') or callable(v):
continue
try:
if isinstance(instance._meta.get_field(k), BinaryField):
v = force_text(base64.b64encode(v))
except FieldDoesNotExist:
pass
data[k] = v
return json.loads(json.dumps(data, cls=DjangoJSONEncoder))
def set_service(self, service_name, service_config):
try:
service = Service.objects.get(name=service_name)
service.json = json.dumps(service_config.config)
except Service.DoesNotExist:
service = Service(
name=service_name,
json=json.dumps(service_config.config, cls=DjangoJSONEncoder),
)
service.save()
def _init_scratch_org(self, org_config):
if not org_config.scratch:
# Only run against scratch orgs
return
# Set up the logger to output to the build.log field
init_logger(self.build)
# Create the scratch org and get its info
info = org_config.scratch_info
# Get the contents of the org's json file from Salesforce DX
json_path = os.path.join(os.path.expanduser('~'), '.sfdx', info['username'] + '.json')
if not os.path.isfile(json_path):
json_path = os.path.join(os.path.expanduser('~'), '.local', '.sfdx', info['username'] + '.json')
with open(json_path, 'r') as json_file:
dx_json = json_file.read()
org_json = json.dumps(org_config.config, cls=DjangoJSONEncoder)
# Create a ScratchOrgInstance to store the org info
instance = ScratchOrgInstance(
org=org_config.org,
build=self.build,
sf_org_id=info['org_id'],
username=info['username'],
json_dx=dx_json,
json=org_json,
)
instance.save()
org_config.org_instance = instance
return org_config
def set_org(self, org_config):
org_json = json.dumps(org_config.config, cls=DjangoJSONEncoder)
try:
org = Org.objects.get(repo=self.build.repo, name=org_config.name)
org.json = org_json
except Org.DoesNotExist:
org = Org(
name=org_config.name,
json=org_json,
repo=self.build.repo,
)
org.scratch = isinstance(org_config, ScratchOrgConfig)
if not org.scratch:
org.save()
def __init__(self, height=None, width=None,
html_id=None, json_encoder_class=DjangoJSONEncoder):
self.height = height
self.width = width
self.json_encoder_class = json_encoder_class
self.html_id = html_id
if ((height is not None or width is not None) and
(self.responsive is None or self.responsive)):
raise ImproperlyConfigured(
'Using the height/width parameter will have no '
'effect if the chart is in responsive mode. '
'Disable responsive mode by setting chart.responsive '
'to False')
def __init__(self, height=None, width=None,
html_id=None, json_encoder_class=DjangoJSONEncoder):
self.height = height
self.width = width
self.json_encoder_class = json_encoder_class
self.html_id = html_id
if ((height is not None or width is not None) and
(self.responsive is None or self.responsive)):
raise ImproperlyConfigured(
'Using the height/width parameter will have no '
'effect if the chart is in responsive mode. '
'Disable responsive mode by setting chart.responsive '
'to False')
def __init__(self, data, encoder=DjangoJSONEncoder, safe=True,
json_dumps_params=None, **kwargs):
if safe and not isinstance(data, dict):
raise TypeError('In order to allow non-dict objects to be '
'serialized set the safe parameter to False')
if json_dumps_params is None:
json_dumps_params = {}
kwargs.setdefault('content_type', 'application/json')
data = json.dumps(data, cls=encoder, **json_dumps_params)
super(JsonResponse, self).__init__(content=data, **kwargs)
def api_detail(self, request, *args, **kwargs):
obj = self.get_object(request, object_id=kwargs.get("pk"))
ModelForm = self.get_form(request, obj=obj)
form = ModelForm(instance=obj)
data = self.obj_as_dict(request, form.instance)
return HttpResponse(json.dumps(data, cls=DjangoJSONEncoder), content_type='application/json')
def api_create(self, request, *args, **kwargs):
data = json.loads(request.body)
ModelForm = self.get_form(request, obj=None)
form = ModelForm(data=data, files=request.FILES)
if form.is_valid():
obj = form.save()
data = self.obj_as_dict(request, obj)
return HttpResponse(json.dumps(data, cls=DjangoJSONEncoder), content_type="application/json")
else:
errors = {
"errors": json.loads(form.errors.as_json())
}
return HttpResponse(json.dumps(errors), status=400, content_type="application/json")
def niftyChannelDataPush():
nifty_data = niftyData()
Group('nifty-channel').send(
{
'text': json.dumps(nifty_data,cls=DjangoJSONEncoder)
})
#================ Leaderboard channel =======================#
# @http_session_user
# @isLoggedInCh
# def connect_to_leaderboard_channel(message):
# Group('leaderboard-channel').add(message.reply_channel)
# message.reply_channel.send({
# 'text' : json.dumps({"accept": True}) #{ "close" : True }
# })
# print("New leaderboard listener added!")
# def disconnect_from_leaderboard_channel(message):
# Group('leaderboard-channel').discard(message.reply_channel)
# def leaderboardChannelDataPush():
# leaderboard_data = leaderboardData()
# Group('leaderboard-channel').send(
# {
# 'text': json.dumps(leaderboard_data,cls=DjangoJSONEncoder)
# })
#=============== Graph Channel =======================#
def graphDataPush():
graphData = graph('NIFTY 50')
Group('NIFTY-50').send({
'text': json.dumps(graphData,cls=DjangoJSONEncoder),
})
#==================== portfolio channel ========================#
def sellDataPush():
for userid in redis_conn.hkeys("online-sellers"):
userid = userid.decode("utf-8")
try:
sellData = sell_data(userid)
Channel( redis_conn.hget("online-sellers",userid) ).send(
{
'text' : json.dumps(sellData,cls=DjangoJSONEncoder)
})
except:
print("sellDataPush failed!")
#================ Ticker Channel =======================#