python类dumps()的实例源码

__init__.py 文件源码 项目:PrivacyScore 作者: PrivacyScore 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def post(self):
        try:
            #Format von "options":
            # {
            #   name: 1,
            #   description: 1,
            #   tags: 1
            # }
            args = parser.parse_args()
            searchtext = args['searchtext']
            options = args['options'] # geht momentan nicht, weil der Request form-encoded reinkommt, nicht als JSON, daher ist options dann None

            List = dumps(connector.Search(searchtext, options))
            return List # hier stand vorher return Scan, das haette nicht geklappt
        except Exception as ex:
            return '{"type":"error", "message":"' + GetException() + '"}'
api-manager.py 文件源码 项目:api-manager 作者: nebula-orchestrator 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def restart_app(app_name):
    rabbit_channel = rabbit_login(rabbit_user, rabbit_password, rabbit_host, rabbit_port, rabbit_vhost,
                                  rabbit_heartbeat)
    app_exists, app_json = mongo_get_app(mongo_collection, app_name)
    # check app exists first
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # check if app already running:
    if app_json["running"] is False:
        rabbit_close(rabbit_channel)
        return "{\"running_before_restart\": \"False\"}", 403
    # post to rabbit to restart app
    app_json["command"] = "restart"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# rolling restart an app
api-manager.py 文件源码 项目:api-manager 作者: nebula-orchestrator 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def roll_app(app_name):
    rabbit_channel = rabbit_login(rabbit_user, rabbit_password, rabbit_host, rabbit_port, rabbit_vhost,
                                  rabbit_heartbeat)
    app_exists, app_json = mongo_get_app(mongo_collection, app_name)
    # check app exists first
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # check if app already running:
    if app_json["running"] is False:
        rabbit_close(rabbit_channel)
        return "{\"running_before_restart\": \"False\"}", 403
    # post to rabbit to restart app
    app_json["command"] = "roll"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# stop an app
api-manager.py 文件源码 项目:api-manager 作者: nebula-orchestrator 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def stop_app(app_name):
    rabbit_channel = rabbit_login(rabbit_user, rabbit_password, rabbit_host, rabbit_port, rabbit_vhost,
                                  rabbit_heartbeat)
    # check app exists first
    app_exists = mongo_check_app_exists(mongo_collection, app_name)
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # post to db
    app_json = mongo_update_app_running_state(mongo_collection, app_name, False)
    # post to rabbit to stop app
    app_json["command"] = "stop"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# start an app
api-manager.py 文件源码 项目:api-manager 作者: nebula-orchestrator 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def start_app(app_name):
    rabbit_channel = rabbit_login(rabbit_user, rabbit_password, rabbit_host, rabbit_port, rabbit_vhost,
                                  rabbit_heartbeat)
    # check app exists first
    app_exists = mongo_check_app_exists(mongo_collection, app_name)
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # post to db
    app_json = mongo_update_app_running_state(mongo_collection, app_name, True)
    # post to rabbit to stop app
    app_json["command"] = "start"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# POST update an app
api-manager.py 文件源码 项目:api-manager 作者: nebula-orchestrator 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def release_app(app_name):
    rabbit_channel = rabbit_login(rabbit_user, rabbit_password, rabbit_host, rabbit_port, rabbit_vhost,
                                  rabbit_heartbeat)
    app_exists, app_json = mongo_get_app(mongo_collection, app_name)
    # check app exists first
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # post to rabbit to restart app
    app_json["command"] = "release"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# list apps
smapp_dataset.py 文件源码 项目:pysmap 作者: SMAPPNYU 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def dump_to_json(self, output_file, num_files=1):
        filehandles = [None]*num_files

        if num_files == 1:
            filehandles[0] = open(output_file, 'a')
        else:
            # open all filehandles
            filename, file_extension = output_file.split(os.extsep, 1)
            for i in range(num_files):
                filehandles[i] = open('{}_{}.{}'.format(filename, i, file_extension), 'a')

        # write the tweets as evenly 
        # as possible in each file
        tracker = 0 
        for tweet in self.get_collection_iterators():
            filehandles[tracker].write(json_util.dumps(tweet)+'\n')
            if tracker == num_files-1:
                tracker = 0
            else:
                tracker += 1

        # close all filehandles
        for fh in filehandles:
            fh.close()
View.py 文件源码 项目:xunfeng 作者: ysrc 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def Getplugin():
    type = request.form.get('type', '')
    risk = request.form.get('risk', '')
    search = request.form.get('search', '')
    query = {}
    if type:
        query['type'] = type
    if risk:
        query['level'] = risk
    if search:
        search = unquote(search)
        query['name'] = {"$regex": search, '$options': 'i'}
    cursor = Mongo.coll['Plugin'].find(query)
    rsp = []
    for i in cursor:
        result = {'name': i['name'], 'info': i['info']}
        rsp.append(result)
    return json.dumps(rsp)


# ??????
views.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def delete_substage(request, id):
    try:
        sub_stage = Stage.objects.get(pk=id)
        old_fsxf = sub_stage.stage_forms
        old_fsxf.is_deleted = True
        # old_fsxf.stage = None
        old_fsxf.save()
        # org = sub_stage.stage.project.organization if sub_stage.stage.project else sub_stage.stage.site.project.organization
        # desc = "deleted form of stage {} substage {} by {}".format(sub_stage.stage.name, sub_stage.name,
        #                                                            request.user.username)
        # noti = old_fsxf.logs.create(source=request.user, type=1, title="form Deleted",
        #         organization=org, description=desc)
        # result = {}
        # result['description'] = desc
        # result['url'] = noti.get_absolute_url()
        # ChannelGroup("notify-{}".format(org.id)).send({"text": json.dumps(result)})
        # ChannelGroup("notify-0").send({"text": json.dumps(result)})
        # sub_stage.delete()
        return Response({}, status=status.HTTP_200_OK)
    except Exception as e:
        return Response({'error':e.message}, status=status.HTTP_400_BAD_REQUEST)
views.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def public_api(request, username, id_string):
    """
    Returns public information about the form as JSON
    """

    xform = get_object_or_404(XForm,
                              user__username__iexact=username,
                              id_string__exact=id_string)

    _DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
    exports = {'username': xform.user.username,
               'id_string': xform.id_string,
               'bamboo_dataset': xform.bamboo_dataset,
               'shared': xform.shared,
               'shared_data': xform.shared_data,
               'downloadable': xform.downloadable,
               'title': xform.title,
               'date_created': xform.date_created.strftime(_DATETIME_FORMAT),
               'date_modified': xform.date_modified.strftime(_DATETIME_FORMAT),
               'uuid': xform.uuid,
               }
    response_text = json.dumps(exports)

    return HttpResponse(response_text, content_type='application/json')
View.py 文件源码 项目:ysrc 作者: myDreamShadow 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def Getplugin():
    type = request.form.get('type', '')
    risk = request.form.get('risk', '')
    search = request.form.get('search', '')
    query = {}
    if type:
        query['type'] = type
    if risk:
        query['level'] = risk
    if search:
        search = unquote(search)
        query['name'] = {"$regex": search, '$options': 'i'}
    cursor = Mongo.coll['Plugin'].find(query)
    rsp = []
    for i in cursor:
        result = {'name': i['name'], 'info': i['info']}
        rsp.append(result)
    return json.dumps(rsp)


# ??????
module_bson.py 文件源码 项目:dataIO-project 作者: MacHu-GWU 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_all(self):
        data = {
            "int": 100,
            "float": 3.1415926535,
            "str": "string example ?????",
            "bytes": "bytes example ?????".encode("utf-8"),
            "boolean": True,
            "datetime": datetime.now()
        }
        js = json_util.dumps(data)

        data1 = json_util.loads(js)
        self.assertEqual(data["int"], data1["int"])
        self.assertAlmostEqual(data["float"], data1["float"], delta=0.0001)
        self.assertEqual(data["str"], data1["str"])
        self.assertEqual(data["boolean"], data1["boolean"])

        print(data1["bytes"])
        print(data1["datetime"])
        print(json_util.dumps(data, sort_keys=True, indent=4))


#--- Unittest ---
web.py 文件源码 项目:musicbot 作者: AdrienPensart 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_music(artist, album, title):
    '''Get a track tags or download it'''
    page_format = request.args.get('format', 'html')
    artist = unquote(artist)
    album = unquote(album)
    title = unquote(title)
    collection = app.config['COLLECTION']
    mf = MusicFilter(artists=[artist], albums=[album], titles=[title])
    musics = webfilter(partial(collection.filter, cursor_factory=RealDictCursor), mf)
    if len(musics) != 1:
        return ('Music not found', 404)
    music = musics[0]

    if page_format == 'html':
        return render_template("music.html", music=music)
    elif page_format == 'json':
        return dumps(music, sort_keys=True, indent=4, separators=(',', ': '))
    return ('Invalid format, available: json,html', 400)
api2.py 文件源码 项目:codex-backend 作者: codexgigassys 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_metadata():
    if request.query.file_hash == '':
        response.status = 400
        return jsonize({'message': 'file_hash parameter is missing'})
    file_hash = clean_hash(request.query.file_hash)
    if not valid_hash(file_hash):
        response.status = 400
        return jsonize({'message': 'Invalid hash format (use MD5, SHA1 or SHA2)'})
    file_hash = get_file_id(file_hash)
    if file_hash is None:
        response.status = 404
        return jsonize({'message': 'Metadata not found in the database'})

    mdc = MetaController()
    res = mdc.read(file_hash)
    if res is None:
        log_event("metadata", file_hash)
    return dumps(change_date_to_str(res))
export.py 文件源码 项目:codex-backend 作者: codexgigassys 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def export_metadata():
    mdc = MetaController()
    hashes = request.forms.dict.get("file_hash[]")
    dump_to_save = ""
    random_id = id_generator()
    tmp_path = "/tmp/meta_export"
    tmp_folder = os.path.join(tmp_path, random_id)
    call_with_output(["mkdir", "-p", tmp_folder])
    for hash in hashes:
        hash = clean_hash(hash.replace('\r', ''))
        res = mdc.read(hash)
        dump = dumps(res, indent=4)
        file_name = os.path.join(tmp_folder, str(hash) + '.txt')
        fd = open(file_name, "w")
        fd.write(dump)
        fd.close()
    zip_path = os.path.join(tmp_path, random_id + '.zip')
    call_with_output(["zip", "-jr", zip_path, tmp_folder])
    resp = static_file(str(random_id) + '.zip', root=tmp_path, download=True)
    resp.set_cookie('fileDownload', 'true')
    shutil.rmtree(tmp_folder)
    os.remove(zip_path)
    return resp
api-manager.py 文件源码 项目:nebula 作者: nebula-orchestrator 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def restart_app(app_name):
    rabbit_channel = rabbit_login()
    app_exists, app_json = mongo_get_app(mongo_collection, app_name)
    # check app exists first
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # check if app already running:
    if app_json["running"] is False:
        rabbit_close(rabbit_channel)
        return "{\"running_before_restart\": \"False\"}", 403
    # post to rabbit to restart app
    app_json["command"] = "restart"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# rolling restart an app
api-manager.py 文件源码 项目:nebula 作者: nebula-orchestrator 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def roll_app(app_name):
    rabbit_channel = rabbit_login()
    app_exists, app_json = mongo_get_app(mongo_collection, app_name)
    # check app exists first
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # check if app already running:
    if app_json["running"] is False:
        rabbit_close(rabbit_channel)
        return "{\"running_before_restart\": \"False\"}", 403
    # post to rabbit to restart app
    app_json["command"] = "roll"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# stop an app
api-manager.py 文件源码 项目:nebula 作者: nebula-orchestrator 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def stop_app(app_name):
    rabbit_channel = rabbit_login()
    # check app exists first
    app_exists = mongo_check_app_exists(mongo_collection, app_name)
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # post to db
    app_json = mongo_update_app_running_state(mongo_collection, app_name, False)
    # post to rabbit to stop app
    app_json["command"] = "stop"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# start an app
api-manager.py 文件源码 项目:nebula 作者: nebula-orchestrator 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def start_app(app_name):
    rabbit_channel = rabbit_login()
    # check app exists first
    app_exists = mongo_check_app_exists(mongo_collection, app_name)
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # post to db
    app_json = mongo_update_app_running_state(mongo_collection, app_name, True)
    # post to rabbit to stop app
    app_json["command"] = "start"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# update an app
data_service.py 文件源码 项目:ShadowCat 作者: NerdCats 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_history(asset_id, document_limit=10):
    try:
        cursor = app.config['DB_COLL_HISTORY'].find(
            {'asset_id': asset_id}
        ).sort(
            "timestamp", pymongo.DESCENDING
        ).limit(int(document_limit))
    except pymongo.errors.AutoReconnect as e:
        logger.error(e.message)
    except Exception as e:
        logger.error(e.message)

    history = []
    for document in cursor:
        doc = utilities.to_isoformat_datetime(document)
        history.append(doc)
    return dumps(history)
utils.py 文件源码 项目:dbs-back 作者: Beit-Hatfutsot 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def collection_to_csv(coll, f):
    '''
        dumps a collection to a csv files.
        the file includes the URL of the hebrew page, its title,
        the url of the english page and its title.
    '''
    from bhs_api.item import SHOW_FILTER
    for i in coll.find(SHOW_FILTER):
        url = slugs_to_urls(i['Slug'])
        try:
            line = ','.join([url['He'], i['Header']['He'].replace(',','|')])
        except KeyError:
            line = ','
        try:
            line = ','.join([line, url['En'], i['Header']['En'].replace(',','|')])
        except KeyError:
            line = ','.join([line, '', ''])
        line += '\n'
        f.write(line.encode('utf8'))
operations.py 文件源码 项目:smart-realestate 作者: stevensshi 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def searchArea(text):
    zpids = []
    if text.isdigit():
        zpids = searchAreaByZip(text)
    else:
        city = text.split(',')[0].strip()
        state = text.split(', ')[1].strip()
        zpids = searchAreaByCityState(city, state)
    print zpids
    res = []
    update_list = []
    db = mongodb_client.getDB()
    for zpid in zpids:
        record = db[PROPERTY_TABLE_NAME].find_one({'zpid': zpid})
        if record != None:
            res.append(record)
        else:
            property_detail = getDetailsByZpid(zpid, False)
            res.append(property_detail)
            update_list.append(property_detail)

    storeUpdates(update_list)

    # Trick: use bson.dumps then re-construct json because of ObjectId.
    return json.loads(dumps(res))
operations.py 文件源码 项目:smart-realestate 作者: stevensshi 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def getDetailsByZpid(zpid, get_prediction=False):
    db = mongodb_client.getDB()
    prop = json.loads(dumps(db[PROPERTY_TABLE_NAME].find_one({'zpid': zpid})))
    if prop == None:
        prop = zillow_web_scraper_client.get_property_by_zpid(zpid)

    # Get prediction
    if get_prediction:
        predicted_value = ml_prediction_client.predict(
            prop['zipcode'],
            prop['property_type'],
            prop['bedroom'],
            prop['bathroom'],
            prop['size'])
        prop['predicted_value'] = int(predicted_value)
    return prop
helpers.py 文件源码 项目:VTU-Result-Analyser 作者: abhijith0505 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def getOneStudentJson(college_code='1mv',year='14',branch='is',regno=45):
    client = MongoClient()

    db = client.results

    students = db.students

    student = students.find_one({"usn" : (college_code + year
        +branch + str(regno).zfill(3)).upper()})

    if student:
        return dumps(student) #dumps is used to convert bson format of mongodb to json

    else :
        student = student_results(college_code,year,branch,regno)
        db.students.insert_one(student)
        return dumps(student)
views.py 文件源码 项目:FormShare 作者: qlands 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def public_api(request, username, id_string):
    """
    Returns public information about the form as JSON
    """

    xform = get_object_or_404(XForm,
                              user__username__iexact=username,
                              id_string__iexact=id_string)

    _DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
    exports = {'username': xform.user.username,
               'id_string': xform.id_string,
               'bamboo_dataset': xform.bamboo_dataset,
               'shared': xform.shared,
               'shared_data': xform.shared_data,
               'downloadable': xform.downloadable,
               'title': xform.title,
               'date_created': xform.date_created.strftime(_DATETIME_FORMAT),
               'date_modified': xform.date_modified.strftime(_DATETIME_FORMAT),
               'uuid': xform.uuid,
               }
    response_text = json.dumps(exports)

    return HttpResponse(response_text, content_type='application/json')
client.py 文件源码 项目:pybot 作者: aster1sk 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def start_socket() :
    readbuffer = ''
    sock.connect(("riboflav.in", 6667))
    while True :
        readbuffer = readbuffer + sock.recv(1024)
        temp = string.split(readbuffer, "\n")
        readbuffer = temp.pop()
        for line in temp :
            body = {
                'uuid' : str(uuid4()),
                'raw'  : line,
                'args' : line.split(),
                'host' : host,
                'port' : port,
                'time' : time(),
                'event': "socket"
            }
            channel.basic_publish(exchange='stream', routing_key='', body=dumps(body))
base_collection.py 文件源码 项目:smappdragon 作者: SMAPPNYU 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def dump_to_json(self, output_json, compression=None, mode='a'):
        if compression == 'bz2':
            mode = binary_mode(mode)
            filehandle = bz2.open(output_json, mode)
        elif compression == 'gzip':
            mode = binary_mode(mode)
            filehandle = gzip.open(output_json, mode)
        else:
            filehandle = open(output_json, mode)

        for tweet in self.get_iterator():
            filehandle.write(json_util.dumps(tweet)+'\n')
        filehandle.close()
autogen.py 文件源码 项目:xgovctf 作者: alphagov 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def write_metadata(pid, n, data):
    """
    Write an autogenerated problem's instance metadata.
    This includes any fields to be overwritten from
    the original problem object.

    Args:
        pid: the problem id
        n: the instance number
        data: the metadata object
    """

    metadata_path = get_metadata_path(pid, n)
    with open(metadata_path, "w") as f:
        f.write(json_util.dumps(data))
__init__.py 文件源码 项目:PrivacyScore 作者: PrivacyScore 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def post(self):
        try:
            args = parser.parse_args()
            listname = args['listname']
            description = args['description']
            tags = request.json['tags']
            columns = request.json['columns']
            isprivate = request.json['isprivate']
            obj = dumps(connector.SaveList(listname, description, tags, columns, True, isprivate, False))
            return obj, 201
        except Exception as ex:
            return '{"type":"error", "message":"' + GetException() + '"}'
__init__.py 文件源码 项目:PrivacyScore 作者: PrivacyScore 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get(self, token):
        try:
            List = dumps(connector.ShowList(token))
            return List
        except bson.errors.InvalidId as invalid:
            return '{"type":"error", "message":"invalid object id"}'
        except Exception as ex:
            return '{"type":"error", "message":"' + GetException() + '"}'


问题


面经


文章

微信
公众号

扫码关注公众号