python类DjangoJSONEncoder()的实例源码

consumers.py 文件源码 项目:Play.Excel 作者: abhijithasokan 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def portfolioDataPush():
    for userid in redis_conn.hkeys("online-users"):
        userid = userid.decode("utf-8")
        try:
            portfolioData = portfolio(userid)
            Channel( redis_conn.hget("online-users",userid)).send(
                {
                'text': json.dumps(portfolioData,cls=DjangoJSONEncoder)
                })
        except:
            print("Error in portfolioPush")




#======================= SELL CHANNEL ==========================#
views.py 文件源码 项目:SPBU-DBMS-Project 作者: Betekhtin 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def history(request):
    username = auth.get_user(request).username
    if (username):
        ?t = city.objects.all().values()
        co = country.objects.all().values()

        city_json = json.dumps(list(?t), cls=DjangoJSONEncoder,ensure_ascii=False)
        country_json = json.dumps(list(co), cls=DjangoJSONEncoder,ensure_ascii=False)
        args={}
        args['city']=city_json
        args['country'] = country_json
        args['max_date'] = []
        for i in ?t:
            args['max_date'].append((temperature.objects.filter(city_id__exact=i['city_id']).latest('date').date))
        return render_to_response("history.html",args)
    else:
        return redirect("/login")
views.py 文件源码 项目:SPBU-DBMS-Project 作者: Betekhtin 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def forecast(request):
    username = auth.get_user(request).username
    if (username):
        ?t = city.objects.all().values()
        co = country.objects.all().values()

        city_json = json.dumps(list(?t), cls=DjangoJSONEncoder,ensure_ascii=False)
        country_json = json.dumps(list(co), cls=DjangoJSONEncoder,ensure_ascii=False)
        args={}
        args['city']=city_json
        args['country'] = country_json
        args['max_date']=[]
        for i in ?t:
            args['max_date'].append((temperature.objects.filter(city_id__exact = i['city_id']).latest('date').date))

        #args['max_date'] = (temperature.objects.filter(city_id__exact=city).latest('date').date)
        return render_to_response("forecast.html",args)
    else:
        return redirect("/login")
views.py 文件源码 项目:django-eventstream 作者: fanout 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def messages(request, room_id):
    if request.method == 'POST':
        try:
            room = ChatRoom.objects.get(eid=room_id)
        except ChatRoom.DoesNotExist:
            try:
                room = ChatRoom(eid=room_id)
                room.save()
            except IntegrityError:
                # someone else made the room. no problem
                room = ChatRoom.objects.get(eid=room_id)

        mfrom = request.POST['from']
        text = request.POST['text']
        with transaction.atomic():
            msg = ChatMessage(room=room, user=mfrom, text=text)
            msg.save()
            send_event('room-%s' % room_id, 'message', msg.to_data())
        body = json.dumps(msg.to_data(), cls=DjangoJSONEncoder)
        return HttpResponse(body, content_type='application/json')
    else:
        return HttpResponseNotAllowed(['POST'])
storage.py 文件源码 项目:django-eventstream 作者: fanout 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def append_event(self, channel, event_type, data):
        from . import models

        db_event = models.Event(
            channel=channel,
            type=event_type,
            data=json.dumps(data, cls=DjangoJSONEncoder))
        db_event.save()

        self.trim_event_log()

        e = Event(
            db_event.channel,
            db_event.type,
            data,
            id=db_event.eid)

        return e
test_jsonrpc_errors.py 文件源码 项目:django-modern-rpc 作者: alorence 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_jsonrpc_invalid_request_1(live_server):

    # Missing 'method' in payload

    headers = {'content-type': 'application/json'}
    payload = {
        # "method": 'add',
        "params": [5, 6],
        "jsonrpc": "2.0",
        "id": random.randint(1, 1000),
    }
    req_data = json.dumps(payload, cls=DjangoJSONEncoder)
    response = requests.post(live_server.url + '/all-rpc/', data=req_data, headers=headers).json()

    assert 'Missing parameter "method"' in response['error']['message']
    assert RPC_INVALID_REQUEST == response['error']['code']
test_jsonrpc_errors.py 文件源码 项目:django-modern-rpc 作者: alorence 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_jsonrpc_invalid_request_2(live_server):

    # Missing 'jsonrpc' in payload

    headers = {'content-type': 'application/json'}
    payload = {
        "method": 'add',
        "params": [5, 6],
        # "jsonrpc": "2.0",
        "id": random.randint(1, 1000),
    }
    req_data = json.dumps(payload, cls=DjangoJSONEncoder)
    response = requests.post(live_server.url + '/all-rpc/', data=req_data, headers=headers).json()

    assert 'Missing parameter "jsonrpc"' in response['error']['message']
    assert RPC_INVALID_REQUEST == response['error']['code']
test_jsonrpc_errors.py 文件源码 项目:django-modern-rpc 作者: alorence 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_jsonrpc_invalid_request_3(live_server):

    # Bad value for payload member 'jsonrpc'

    headers = {'content-type': 'application/json'}
    payload = {
        "method": 'add',
        "params": [5, 6],
        "jsonrpc": "1.0",
        "id": random.randint(1, 1000),
    }
    req_data = json.dumps(payload, cls=DjangoJSONEncoder)
    response = requests.post(live_server.url + '/all-rpc/', data=req_data, headers=headers).json()

    assert 'The attribute "jsonrpc" must contain "2.0"' in response['error']['message']
    assert RPC_INVALID_REQUEST == response['error']['code']
utils.py 文件源码 项目:django-twilio-tfa 作者: rtindru 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def serialize_instance(instance):
    """
    Since Django 1.6 items added to the session are no longer pickled,
    but JSON encoded by default. We are storing partially complete models
    in the session (user, account, token, ...). We cannot use standard
    Django serialization, as these are models are not "complete" yet.
    Serialization will start complaining about missing relations et al.
    """
    data = {}
    for k, v in instance.__dict__.items():
        if k.startswith('_') or callable(v):
            continue
        try:
            if isinstance(instance._meta.get_field(k), BinaryField):
                v = force_text(base64.b64encode(v))
        except FieldDoesNotExist:
            pass
        data[k] = v
    return json.loads(json.dumps(data, cls=DjangoJSONEncoder))
views.py 文件源码 项目:gwells 作者: bcgov 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def common_well_search(request):
    """
        Returns json and array data for a well search.  Used by both the map and text search.
    """
    well_results = None
    well_results_json = '[]'

    form = SearchForm(request.GET)
    if form.is_valid():
        well_results = form.process()

    if well_results and not len(well_results) > SearchForm.WELL_RESULTS_LIMIT:
        well_results_json = json.dumps(
            [well.as_dict() for well in well_results],
            cls=DjangoJSONEncoder)
    return form, well_results, well_results_json
fields.py 文件源码 项目:django-chemtrails 作者: inonit 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwargs):
        """
        :param dump_kwargs: Keyword arguments which will be passed to `json.dumps()`
        :param load_kwargs: Keyword arguments which will be passed to `json.loads()`
        """
        self.dump_kwargs = kwargs.pop('dump_kwargs', {
            'cls': DjangoJSONEncoder,
            'ensure_ascii': False,
            'sort_keys': False,
            'separators': (',', ':')
        })
        self.load_kwargs = kwargs.pop('load_kwargs', {
            'object_pairs_hook': OrderedDict
        })
        if not kwargs.get('null', False):
            kwargs['default'] = kwargs.get('default', defaultdict(OrderedDict))
        super(JSONField, self).__init__(*args, **kwargs)
json.py 文件源码 项目:c3nav 作者: c3nav 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _preencode(data, magic_marker, in_coords=False, in_groups=False):
    if isinstance(data, dict):
        data = data.copy()
        for name, value in tuple(data.items()):
            if name in ('bounds', 'point', 'locations') and isinstance(value, (tuple, list)):
                data[name] = magic_marker+json.dumps(value, cls=DjangoJSONEncoder)+magic_marker
            else:
                data[name] = _preencode(value, magic_marker,
                                        in_coords=(name == 'coordinates'), in_groups=(name == 'groups'))
        return data
    elif isinstance(data, (tuple, list)):
        if (in_coords and data and isinstance(data[0], (int, float))) or in_groups:
            return magic_marker+json.dumps(data, cls=DjangoJSONEncoder)+magic_marker
        else:
            return tuple(_preencode(value, magic_marker, in_coords) for value in data)
    else:
        return data
admin.py 文件源码 项目:django-happymailer 作者: barbuza 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def export_action(self, request):
        response = HttpResponse(content_type='application/json')
        response['Content-Disposition'] = 'attachment; filename=templatemodel_export.json'

        items = [{
            'name': t.name,
            'layout': t.layout,
            'subject': t.subject,
            'body': t.body,
            'version': t.version,
            'enabled': t.enabled,
            'has_errors': t.has_errors,
            'created_at': t.created_at,
            'updated_at': t.updated_at,
            'history': [{
                'layout': h.layout,
                'subject': h.subject,
                'body': h.body,
                'version': h.version,
                'archived_at': h.archived_at,
            } for h in t.history.all()]
        } for t in TemplateModel.objects.all()]

        json.dump(items, response, cls=DjangoJSONEncoder, indent=2, sort_keys=True)
        return response
mining.py 文件源码 项目:CSCE482-WordcloudPlus 作者: ggaytan00 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def calc_frequency(areaNum, word_counts, curr_timeslot_word_counts):
    timeslot_area_percentage = {}

    for i in word_counts:
        for j in curr_timeslot_word_counts:
            #on same word (key) calculate actual percent
            if i == j:
                timeslot_area_percentage[i] = (word_counts[i]/(curr_timeslot_word_counts[j] + 0.00)) * 100
                timeslot_area_percentage[i] = round(timeslot_area_percentage[i], 2)

    return timeslot_area_percentage
    # if areaNum == 1:
    #   return {"area1" : timeslot_area_percentage} # TODO: convert to django friendly using below??
    # elif areaNum == 2:
    #   return {"area2" : timeslot_area_percentage}
    # elif areaNum == 3:
    #   return {"area3" : timeslot_area_percentage}

    #convert site percents to django friendly JSON
    # site1_percentage_json = json.dumps(dict(site1_percentage), cls=DjangoJSONEncoder)
    # site2_percentage_json = json.dumps(dict(site2_percentage), cls=DjangoJSONEncoder)

# takes a url as a string and returns a SET of TUPLES of all of the words
# that are used on that webpage in order of frequency
common_nninfo_list.py 文件源码 项目:skp_edu_docker 作者: TensorMSA 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get(self, request, nnid):
        """
        Common Network Info Get Method
        ---
        # Class Name : CommonNNInfoList

        # Description:
            Structure : <nninfo> - version - batch version
            Search deinfed list of neural networks
        """
        try:
            condition = {}
            condition['nn_id'] = nnid
            if str(nnid).lower() == 'all':
                condition['nn_id'] = '%'
            elif str(nnid).lower() == 'seq':
                condition['nn_id'] = 'seq'
            return_data = NNCommonManager().get_nn_info(condition)
            logging.info(return_data)
            return Response(json.dumps(return_data, cls=DjangoJSONEncoder))
        except Exception as e:
            return_data = {"status": "404", "result": str(e)}
            return Response(json.dumps(return_data, cls=DjangoJSONEncoder))
test_unifiedtranscoder.py 文件源码 项目:djangoevents 作者: ApplauseOSS 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_serialize_and_deserialize_1():
    transcoder = UnifiedTranscoder(json_encoder_cls=DjangoJSONEncoder)
    # test serialize
    created = SampleAggregate.Created(entity_id='b089a0a6-e0b3-480d-9382-c47f99103b3d', attr1='val1', attr2='val2',
                                      metadata={'command_id': 123})
    created_stored_event = transcoder.serialize(created)
    assert created_stored_event.event_type == 'sample_aggregate_created'
    assert created_stored_event.event_version == 1
    assert created_stored_event.event_data == '{"attr1":"val1","attr2":"val2"}'
    assert created_stored_event.aggregate_id == 'b089a0a6-e0b3-480d-9382-c47f99103b3d'
    assert created_stored_event.aggregate_version == 0
    assert created_stored_event.aggregate_type == 'SampleAggregate'
    assert created_stored_event.module_name == 'djangoevents.tests.test_unifiedtranscoder'
    assert created_stored_event.class_name == 'SampleAggregate.Created'
    assert created_stored_event.metadata == '{"command_id":123}'
    # test deserialize
    created_copy = transcoder.deserialize(created_stored_event)
    assert 'metadata' not in created_copy.__dict__
    created.__dict__.pop('metadata')  # metadata is not included in deserialization
    assert created.__dict__ == created_copy.__dict__
test_unifiedtranscoder.py 文件源码 项目:djangoevents 作者: ApplauseOSS 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_serialize_and_deserialize_2():
    transcoder = UnifiedTranscoder(json_encoder_cls=DjangoJSONEncoder)
    # test serialize
    updated = SampleAggregate.Updated(entity_id='b089a0a6-e0b3-480d-9382-c47f99103b3d', entity_version=10, attr1='val1',
                                      attr2='val2', metadata={'command_id': 123})
    updated_stored_event = transcoder.serialize(updated)
    assert updated_stored_event.event_type == 'overridden_event_type'
    assert updated_stored_event.event_version == 1
    assert updated_stored_event.event_data == '{"attr1":"val1","attr2":"val2"}'
    assert updated_stored_event.aggregate_id == 'b089a0a6-e0b3-480d-9382-c47f99103b3d'
    assert updated_stored_event.aggregate_version == 10
    assert updated_stored_event.aggregate_type == 'SampleAggregate'
    assert updated_stored_event.module_name == 'djangoevents.tests.test_unifiedtranscoder'
    assert updated_stored_event.class_name == 'SampleAggregate.Updated'
    assert updated_stored_event.metadata == '{"command_id":123}'
    # test deserialize
    updated_copy = transcoder.deserialize(updated_stored_event)
    assert 'metadata' not in updated_copy.__dict__
    updated.__dict__.pop('metadata') # metadata is not included in deserialization
    assert updated.__dict__ == updated_copy.__dict__
test_unifiedtranscoder.py 文件源码 项目:djangoevents 作者: ApplauseOSS 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_transcoder_passes_all_attributes_to_event_constructor():
    attributes = {
        'entity_id': 'b089a0a6-e0b3-480d-9382-c47f99103b3d',
        'foo': 0,
        'bar': 1,
    }

    event = SampleAggregate.Created(**attributes)
    transcoder = UnifiedTranscoder(json_encoder_cls=DjangoJSONEncoder)
    serialized_event = transcoder.serialize(event)

    init_call_args = None
    def init(self, *args, **kwargs):
        nonlocal init_call_args
        init_call_args = (args, kwargs)

    with mock.patch.object(SampleAggregate.Created, '__init__', init):
        transcoder.deserialize(serialized_event)

    args, kwargs = init_call_args
    assert args == tuple()

    for key, value in attributes.items():
        assert key in kwargs
        assert kwargs[key] == value
utils.py 文件源码 项目:Provo-Housing-Database 作者: marcopete5 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def serialize_instance(instance):
    """
    Since Django 1.6 items added to the session are no longer pickled,
    but JSON encoded by default. We are storing partially complete models
    in the session (user, account, token, ...). We cannot use standard
    Django serialization, as these are models are not "complete" yet.
    Serialization will start complaining about missing relations et al.
    """
    data = {}
    for k, v in instance.__dict__.items():
        if k.startswith('_') or callable(v):
            continue
        try:
            if isinstance(instance._meta.get_field(k), BinaryField):
                v = force_text(base64.b64encode(v))
        except FieldDoesNotExist:
            pass
        data[k] = v
    return json.loads(json.dumps(data, cls=DjangoJSONEncoder))
keychain.py 文件源码 项目:MetaCI 作者: SalesforceFoundation 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def set_service(self, service_name, service_config):
        try:
            service = Service.objects.get(name=service_name)
            service.json = json.dumps(service_config.config)
        except Service.DoesNotExist:
            service = Service(
                name=service_name,
                json=json.dumps(service_config.config, cls=DjangoJSONEncoder),
            )
        service.save()
keychain.py 文件源码 项目:MetaCI 作者: SalesforceFoundation 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _init_scratch_org(self, org_config):
        if not org_config.scratch:
            # Only run against scratch orgs
            return

        # Set up the logger to output to the build.log field
        init_logger(self.build)

        # Create the scratch org and get its info
        info = org_config.scratch_info

        # Get the contents of the org's json file from Salesforce DX
        json_path = os.path.join(os.path.expanduser('~'), '.sfdx', info['username'] + '.json')
        if not os.path.isfile(json_path):
            json_path = os.path.join(os.path.expanduser('~'), '.local', '.sfdx', info['username'] + '.json')
        with open(json_path, 'r') as json_file:
            dx_json = json_file.read()

        org_json = json.dumps(org_config.config, cls=DjangoJSONEncoder)

        # Create a ScratchOrgInstance to store the org info
        instance = ScratchOrgInstance(
            org=org_config.org,
            build=self.build,
            sf_org_id=info['org_id'],
            username=info['username'],
            json_dx=dx_json,
            json=org_json,
        )
        instance.save()
        org_config.org_instance = instance

        return org_config
keychain.py 文件源码 项目:MetaCI 作者: SalesforceFoundation 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def set_org(self, org_config):
        org_json = json.dumps(org_config.config, cls=DjangoJSONEncoder)
        try:
            org = Org.objects.get(repo=self.build.repo, name=org_config.name)
            org.json = org_json
        except Org.DoesNotExist:
            org = Org(
                name=org_config.name,
                json=org_json,
                repo=self.build.repo,
            )

        org.scratch = isinstance(org_config, ScratchOrgConfig)
        if not org.scratch:
            org.save()
__init__.py 文件源码 项目:django-jchart 作者: matthisk 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, height=None, width=None,
                 html_id=None, json_encoder_class=DjangoJSONEncoder):
        self.height = height
        self.width = width
        self.json_encoder_class = json_encoder_class
        self.html_id = html_id

        if ((height is not None or width is not None) and
                (self.responsive is None or self.responsive)):
            raise ImproperlyConfigured(
                'Using the height/width parameter will have no '
                'effect if the chart is in responsive mode. '
                'Disable responsive mode by setting chart.responsive '
                'to False')
__init__.py 文件源码 项目:django-jchart 作者: matthisk 项目源码 文件源码 阅读 51 收藏 0 点赞 0 评论 0
def __init__(self, height=None, width=None,
                 html_id=None, json_encoder_class=DjangoJSONEncoder):
        self.height = height
        self.width = width
        self.json_encoder_class = json_encoder_class
        self.html_id = html_id

        if ((height is not None or width is not None) and
                (self.responsive is None or self.responsive)):
            raise ImproperlyConfigured(
                'Using the height/width parameter will have no '
                'effect if the chart is in responsive mode. '
                'Disable responsive mode by setting chart.responsive '
                'to False')
response.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, data, encoder=DjangoJSONEncoder, safe=True,
                 json_dumps_params=None, **kwargs):
        if safe and not isinstance(data, dict):
            raise TypeError('In order to allow non-dict objects to be '
                'serialized set the safe parameter to False')
        if json_dumps_params is None:
            json_dumps_params = {}
        kwargs.setdefault('content_type', 'application/json')
        data = json.dumps(data, cls=encoder, **json_dumps_params)
        super(JsonResponse, self).__init__(content=data, **kwargs)
sites.py 文件源码 项目:django-admino 作者: erdem 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def api_detail(self, request, *args, **kwargs):
        obj = self.get_object(request, object_id=kwargs.get("pk"))
        ModelForm = self.get_form(request, obj=obj)
        form = ModelForm(instance=obj)
        data = self.obj_as_dict(request, form.instance)
        return HttpResponse(json.dumps(data, cls=DjangoJSONEncoder), content_type='application/json')
sites.py 文件源码 项目:django-admino 作者: erdem 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def api_create(self, request, *args, **kwargs):
        data = json.loads(request.body)

        ModelForm = self.get_form(request, obj=None)
        form = ModelForm(data=data, files=request.FILES)
        if form.is_valid():
            obj = form.save()
            data = self.obj_as_dict(request, obj)
            return HttpResponse(json.dumps(data, cls=DjangoJSONEncoder), content_type="application/json")
        else:
            errors = {
                "errors": json.loads(form.errors.as_json())
            }
            return HttpResponse(json.dumps(errors), status=400, content_type="application/json")
consumers.py 文件源码 项目:Play.Excel 作者: abhijithasokan 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def niftyChannelDataPush():

    nifty_data = niftyData()
    Group('nifty-channel').send( 
        {
            'text': json.dumps(nifty_data,cls=DjangoJSONEncoder)
        })





#================ Leaderboard channel =======================#

# @http_session_user
# @isLoggedInCh
# def connect_to_leaderboard_channel(message):
#   Group('leaderboard-channel').add(message.reply_channel)
#   message.reply_channel.send({
#       'text' : json.dumps({"accept": True}) #{ "close" : True }
#       })
#   print("New leaderboard listener added!")


# def disconnect_from_leaderboard_channel(message):
#   Group('leaderboard-channel').discard(message.reply_channel)

# def leaderboardChannelDataPush():

#   leaderboard_data = leaderboardData()
#   Group('leaderboard-channel').send( 
#       {
#           'text': json.dumps(leaderboard_data,cls=DjangoJSONEncoder)
#       })


#=============== Graph Channel =======================#
consumers.py 文件源码 项目:Play.Excel 作者: abhijithasokan 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def graphDataPush():
    graphData = graph('NIFTY 50')
    Group('NIFTY-50').send({
        'text': json.dumps(graphData,cls=DjangoJSONEncoder),
        })






#==================== portfolio channel ========================#
consumers.py 文件源码 项目:Play.Excel 作者: abhijithasokan 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def sellDataPush():
    for userid in redis_conn.hkeys("online-sellers"):
        userid = userid.decode("utf-8")
        try:
            sellData = sell_data(userid)
            Channel( redis_conn.hget("online-sellers",userid) ).send(
                {
                'text' : json.dumps(sellData,cls=DjangoJSONEncoder)
                })
        except:
            print("sellDataPush failed!")



#================ Ticker Channel =======================#


问题


面经


文章

微信
公众号

扫码关注公众号