python类marshal()的实例源码

models_api.py 文件源码 项目:LDA-REST 作者: valentinarho 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def delete(self, model_id):
        """
        Delete model model_id

        :param model_id:
        :return:
        """

        # delete model and files on disk
        status_code, deleted_model = lda_utils.delete_model(model_id)

        if status_code == 404:
            return api_utils.prepare_error_response(404, 'Model not found.',
                                                      more_info={'model_id': model_id}), 404

        else:
            return api_utils.prepare_success_response(200, 'Model deleted.',
                                                      more_info=marshal(deleted_model,
                                                                        api_utils.model_fields)), 200
topics_api.py 文件源码 项目:LDA-REST 作者: valentinarho 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def patch(self, model_id, topic_id):
        """
        Edit the topic topic_id in model model_id changing some additional information (e.g. topic_label)
        :param model_id:
        :param topic_id:
        :return:
        """

        parser = reqparse.RequestParser(bundle_errors=True)
        parser.add_argument('label', default=None, required=False, type=str,
                            help='The human readable label of the topic.')
        parser.add_argument('description', default=None, required=False, type=str,
                            help='The human readable description of the topic.')
        args = parser.parse_args()

        if args['label'] is not None or args['description'] is not None:
            topic = db_utils.update_topic(model_id, int(topic_id), args['label'], args['description'])
            if topic is None:
                return api_utils.prepare_error_response(500, 'Error during the update, check the provided topic id.')
            else:
                return api_utils.prepare_success_response(200, 'Topic updated.', data=marshal(topic, api_utils.topic_fields))
        else:
            return api_utils.prepare_error_response(500, 'Provide the label or the description to set.')
helpers.py 文件源码 项目:vote4code 作者: welovecoding 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def make_response(data, marshal_table, cursors=None):
  if util.is_iterable(data):
    response = {
      'status': 'success',
      'count': len(data),
      'now': datetime.utcnow().isoformat(),
      'result': map(lambda l: flask_restful.marshal(l, marshal_table), data),
    }
    if cursors:
      if isinstance(cursors, dict):
        if cursors.get('next'):
          response['next_cursor'] = cursors['next']
          response['next_url'] = util.generate_next_url(cursors['next'])
        if cursors.get('prev'):
          response['prev_cursor'] = cursors['prev']
          response['prev_url'] = util.generate_next_url(cursors['prev'])
      else:
        response['next_cursor'] = cursors
        response['next_url'] = util.generate_next_url(cursors)
    return util.jsonpify(response)
  return util.jsonpify({
    'status': 'success',
    'now': datetime.utcnow().isoformat(),
    'result': flask_restful.marshal(data, marshal_table),
  })
rest.py 文件源码 项目:aide 作者: Lambda-3 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def handle_result(result, rsc):
    logdebug(result)
    if result or result == []:
        try:
            return marshal(result, rsc.marshal_with(), rsc.envelope())
        except AttributeError as e:
            if "'NoneType' object has no attribute 'items'" in str(e):
                loginfo(e)
                try:
                    result = to_dict(result)
                    loginfo(result)
                    return marshal(result, rsc.marshal_with(), rsc.envelope())
                except AttributeError as e:
                    if isinstance(result, dict):
                        logdebug("result is already dict")
                        return result
                    else:
                        loginfo(str(e))
                        return {rsc.envelope(): result}

            else:
                raise AttributeError(e)
azure_iot_https.py 文件源码 项目:floranet 作者: Fluent-networks 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def marshal(self):
        """Get REST API marshalled fields as an orderedDict

        Returns:
            OrderedDict of fields defined by marshal_fields
        """
        marshal_fields = {
            'type': fields.String(attribute='__class__.__name__'),
            'id': fields.Integer(attribute='appinterface.id'),
            'name': fields.String,
            'iothost': fields.String,
            'keyname': fields.String,
            'keyvalue': fields.String,
            'poll_interval': fields.Integer,
            'started': fields.Boolean,
        }
        return marshal(self, marshal_fields)
appproperty.py 文件源码 项目:floranet 作者: Fluent-networks 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get(self, appeui):
        """Method to get all app properties"""
        try:

            # Return a 404 if the application is not found.            
            app = yield Application.find(where=['appeui = ?', appeui], limit=1)
            if app is None:
                abort(404, message={'error': "Application {} doesn't exist."
                                    .format(euiString(appeui))})

            # Get the properties
            properties = yield app.properties.get()
            if properties is None:
                returnValue({})

            data = {}
            for i,p in enumerate(properties):
                data[i] = marshal(p, self.fields)
            returnValue(data)

        except TimeoutError:
            log.error("REST API timeout retrieving application {appeui} "
                      "properties", appeui=euiString(appeui))
appinterface.py 文件源码 项目:floranet 作者: Fluent-networks 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get(self):
        """Method to get all application interfaces"""
        try:
            interfaces = interfaceManager.getAllInterfaces()
            if interfaces is None:
                returnValue({})
            marshal_fields = {
                'type': fields.String(attribute='__class__.__name__'),
                'id': fields.Integer(attribute='appinterface.id'),
                'name': fields.String
            }
            data = {}
            for i,interface in enumerate(interfaces):
                data[i] = marshal(interface, marshal_fields)
            returnValue(data)
            yield

        except TimeoutError:
            log.error("REST API timeout retrieving application interfaces")
application.py 文件源码 项目:floranet 作者: Fluent-networks 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get(self, appeui):
        """Method to handle application GET requests

        Args:
            appeui (int): Application EUI
        """
        try:
            a = yield Application.find(where=['appeui = ?', appeui], limit=1)
            # Return a 404 if not found.
            if a is None:
                abort(404, message={'error': "Application {} doesn't exist."
                                    .format(euiString(appeui))})

            data = marshal(a, self.fields)
            data['properties'] = yield self.getProperties(a)
            returnValue(data)

        except TimeoutError:
            log.error("REST API timeout retrieving application {appeui}",
                      appeui=euiString(appeui))
device.py 文件源码 项目:floranet 作者: Fluent-networks 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get(self, deveui):
        """Method to handle device GET request

        Args:
            deveui (int): Device deveui
        """
        try:
            d = yield Device.find(where=['deveui = ?', deveui], limit=1)
            # Return a 404 if not found.
            if d is None:
               abort(404, message={'error': "Device {} doesn't exist".
                                   format(euiString(deveui))})
            returnValue(marshal(d, self.fields))

        except TimeoutError:
            log.error("REST API timeout for device GET request")
users.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def get(self, org_id, location_id, role_id, shift_id):
        shift = Shift2.query.get(shift_id)
        allow_past = g.current_user.is_sudo(
        ) or g.current_user.is_org_admin_or_location_manager(org_id,
                                                             location_id)

        within_caps, exceeds_caps = shift.get_all_eligible_users(
            allow_past=allow_past)

        marshal_within_caps = map(lambda user: marshal(user, user_fields),
                                  within_caps)
        marshal_exceeds_caps = map(lambda user: marshal(user, user_fields),
                                   exceeds_caps)

        for user in marshal_within_caps:
            user["within_caps"] = True

        for user in marshal_exceeds_caps:
            user["within_caps"] = False

        return {API_ENVELOPE: marshal_within_caps + marshal_exceeds_caps}
user.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get(self, org_id, location_id, role_id, user_id):
        user = User.query.get_or_404(user_id)
        rtu = RoleToUser.query.filter_by(
            user_id=user_id, role_id=role_id).first()

        if user is None:
            abort(404)

        if rtu is None:
            abort(404)

        data = {}
        data.update(marshal(user, user_fields))
        data.update(marshal(rtu, role_to_user_fields))

        return {
            API_ENVELOPE: data,
            "resources": ["timeclocks", "timeoffrequests"],
        }
organizations.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get(self):
        parser = reqparse.RequestParser()
        parser.add_argument("offset", type=int, default=0)
        parser.add_argument("limit", type=int, default=25)

        args = parser.parse_args()

        offset = args["offset"]
        limit = args["limit"]

        response = {
            "offset": offset,
            "limit": limit,
            API_ENVELOPE: [],
        }

        organizations = Organization.query
        response[API_ENVELOPE] = map(
            lambda organization: marshal(organization, organization_fields),
            organizations.limit(limit).offset(offset).all())

        return response
organizations.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def post(self):
        parser = reqparse.RequestParser()
        parser.add_argument("name", type=str)
        parser.add_argument("day_week_starts", type=str, default="monday")
        parser.add_argument("plan", type=str, default="flex-v1")
        parser.add_argument("trial_days", type=int, default=14)
        parameters = parser.parse_args(strict=True)

        if parameters.get("day_week_starts") not in DAYS_OF_WEEK:
            return {"message": "day_week_starts is not valid"}, 400

        if parameters.get("trial_days") < 0:
            return {"messages": "trial_days cannot be less than 0"}

        o = Organization(
            name=parameters.get("name"),
            day_week_starts=parameters.get("day_week_starts"),
            trial_days=parameters.get("trial_days"))
        db.session.add(o)
        try:
            db.session.commit()
            return marshal(o, organization_fields), 201
        except:
            abort(500)
chomp_tasks.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get(self):
        """ Return all queued calculations """
        response = {}
        response[API_ENVELOPE] = {}

        for state in ['chomp-queue', 'chomp-processing']:
            data = Schedule2.query.join(Role)\
                .join(Location)\
                .join(Organization)\
                .filter(Schedule2.state == state, Organization.active == True)\
                .all()
            response[API_ENVELOPE][state] = map(
                lambda schedule: marshal(schedule, tasking_schedule_fields),
                data)

        return response
app.py 文件源码 项目:metaseek 作者: ahoarfrost 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def post(self):
        try:
            parser = reqparse.RequestParser()
            parser.add_argument('filter_params', type=str)
            args = parser.parse_args()
            filter_params = json.loads(args['filter_params'])
            rules = filter_params['rules']

            queryObject = filterDatasetQueryObjectWithRules(Dataset.query,rules)
            result = marshal(queryObject.all(), fullDatasetFields)
            if queryObject.count()>user_n_threshold:
                return {'error':'you are trying to get metadata for too many datasets at once. Please query the database for '+str(user_n_threshold)+' or fewer datasets at a time'}
            else:
                return {"count_matching_datasets":len(result), "filter_params":filter_params, "datasets":result}

        except Exception as e:
            return {'error': str(e)}
app.py 文件源码 项目:metaseek 作者: ahoarfrost 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def post(self):
        try:
            parser = reqparse.RequestParser()
            parser.add_argument('metaseek_ids', type=str)
            args = parser.parse_args()
            metaseek_ids = json.loads(args['metaseek_ids'])

            #check if the number of ids you're looking up is below an acceptable threshold; otherwise return error
            if len(metaseek_ids)>user_n_threshold:
                return {'error':'you are trying to get metadata for too many datasets at once. Please query the database for '+str(user_n_threshold)+' or fewer datasets at a time'}
            else:
                queryObject = filterQueryByRule(Dataset,Dataset.query,'id',8,metaseek_ids)
                result = marshal(queryObject.all(), fullDatasetFields)
                return {"count_matching_datasets":len(result), "datasets":result}

        except Exception as e:
            return {'error': str(e)}


# End route functions

# Declare routing
account.py 文件源码 项目:backend 作者: gitplaylist 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def post(self):
        """Serve POST requests."""
        try:
            user = User(
                email=request.form.get('email', ''),
                password=request.form.get('password', '')
            )
        except ValidationError as error:
            return {'errors': error.message}, 400

        db.session.add(user)
        db.session.commit()

        login_user(user)

        return marshal(user, user_fields), 201
app.py 文件源码 项目:Turkish-Language-NLP-API 作者: WoodProgrammer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get(self,person_id):
        n=2
        occurs=[]
        grams_arr=[]
        sixgrams = ngrams(str_read.split(), n)
        for grams in sixgrams:
            #print str(grams)
            x=NGram.compare('{}'.format(person_id),str(grams))
            occurs.append(x)
            grams_arr.append(str(grams))

        main_fields={'occurs':fields.String,"word":fields.String}
        datas={'occurs':"{}".format(max(occurs)*1000),'word':"{}".format(grams_arr[occurs.index(max(occurs))])}
        x=marshal(datas,main_fields)
        #json.dumps(marshal(datas,main_fields))
        return x
server.py 文件源码 项目:Turkish-Language-NLP-API 作者: WoodProgrammer 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get(self,person_id):
        n=2
        occurs=[]
        grams_arr=[]
        sixgrams = ngrams(str_read.split(), n)
        for grams in sixgrams:
            #print str(grams)
            x=NGram.compare('{}'.format(person_id.decode('latin-1')),str(grams))
            occurs.append(x)
            grams_arr.append(str(grams))

        main_fields={'occurs':fields.String,"word":fields.String}
        datas={'occurs':"{}".format(max(occurs)*1000),'word':"{}".format(grams_arr[occurs.index(max(occurs))])}
        x=marshal(datas,main_fields)
        #json.dumps(marshal(datas,main_fields))
        return x
server.py 文件源码 项目:Turkish-Language-NLP-API 作者: WoodProgrammer 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get(self,paragraph):
        word_true=[]
        paragraph_arr=paragraph.split()
        for i in paragraph_arr:
            for db_word in r_server.scan_iter():
                if i== db_word:
                    word_true.append(i)
                    break
        #accuracy=paragraph[:]-word_true[:]
        main_fields={'true_words':fields.String,"all_words":fields.String,"accuracy":fields.Integer}
        diff=set(paragraph_arr)-set(word_true)
        accuracy=100-len(diff)*10
        datas={'true_words':word_true,"all_words":paragraph,"accuracy":accuracy}
        x=marshal(datas,main_fields)

        return x
test_app.py 文件源码 项目:tuning-box 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _assert_db_effect(self, model, key, fields, expected):
        with self.app.app_context():
            obj = model.query.get(key)
            self.assertIsNotNone(obj)
            marshalled = flask_restful.marshal(obj, fields)
        self.assertEqual(expected, marshalled)
linkero.py 文件源码 项目:linkero 作者: ingran 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def getResourceURL(endpoint, selector_name = None, selector_value = None, absolute = False):
    if selector_name is None and selector_value is None:
        selector_name = "dummy_RDCH106"
        selector_value = "dummy_RDCH106"
    uri_field = {'url': fields.Url(endpoint)}
    selector = {selector_name: selector_value}
    if absolute:
        return getDomain(request) + marshal(selector, uri_field)["url"]
    else:
        return marshal(selector, uri_field)["url"]
models_api.py 文件源码 项目:LDA-REST 作者: valentinarho 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get(self):
        """
        Retrieve all available models from db.
        :return:
        """

        models = {'models': db_utils.get_all_models()}
        # models = api_utils.filter_only_exposed(models, config.exposed_fields['models'])
        return api_utils.prepare_success_response(200, 'Models retrieved.', marshal(models, api_utils.models_fields)), 200
models_api.py 文件源码 项目:LDA-REST 作者: valentinarho 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get(self, model_id):
        """
        Get all info related to model model_id

        :param model_id:
        :return:
        """
        model = db_utils.get_model(model_id)
        if model is None:
            return jsonify(api_utils.prepare_error_response(404,
                                                    'Model id not found.',
                                                    more_info={'model_id': model_id}))

        return api_utils.prepare_success_response(200, 'Model retrieved.',
                                                  marshal(model, api_utils.model_fields)), 200
documents_api.py 文件源码 项目:LDA-REST 作者: valentinarho 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get(self, model_id, topic_id=None):
        """
        Get all documents that have a topic assignment in model model_id

        :param model_id:
        :return:
        """
        topic_weight = 0.0

        if topic_id is not None:
            topic_id = int(topic_id)

            parser = reqparse.RequestParser(bundle_errors=True)
            parser.add_argument('threshold', default=0.0, required=False,
                                type=float, help='The minimum probability that the specified topic should '
                                     'have to consider that document as related.')

            args = parser.parse_args()
            topic_weight = args['threshold']

        data = {'documents': db_utils.get_all_documents(model_id, topic_id, topic_weight)}

        # data = api_utils.filter_only_exposed(data, config.exposed_fields['documents'])
        response = 'Documents retrieved.'

        return api_utils.prepare_success_response(200, response, marshal(data, api_utils.documents_fields)['documents'])
documents_api.py 文件源码 项目:LDA-REST 作者: valentinarho 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get(self, model_id, document_id):
        """
        Get all info about documents and topics assigned to document document_id in model model_id

        :param model_id:
        :param document_id: string, the document id
        :return:
        """

        parser = reqparse.RequestParser(bundle_errors=True)
        parser.add_argument('threshold', default=config.defaults['minimum_threshold']['value'], required=False,
                            type=float,
                            help='The minimum probability that a topic should '
                                 'have to be returned as related to the document.')

        args = parser.parse_args()
        threshold = args['threshold']

        data = db_utils.get_document(model_id, str(document_id), topics_threshold=threshold)

        if data is None:
            return api_utils.prepare_error_response(400, 'Document not found', more_info={'model_id': model_id, 'document_id': document_id})

        data['threshold'] = threshold
        data['model_id'] = model_id

        return api_utils.prepare_success_response(200, 'Document retrieved.', marshal(data, api_utils.document_fields))
topics_api.py 文件源码 项目:LDA-REST 作者: valentinarho 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def search(self, model_id):
        """
        Extracts topics from a specified query

        :param model_id:
        :return:
        """
        parser = reqparse.RequestParser(bundle_errors=True)
        parser.add_argument('threshold', default=0.0, required=False, type=float,
                            help='The minimum probability that a topic should have to be returned as related to the query string.')
        parser.add_argument('text', default=None, required=True, type=str,
                            help='The query to assign topics to.')

        args = parser.parse_args()

        if args['text'].strip() is not None:
            data = {'model_id': model_id, 'threshold': args['threshold'], 'textual_query': args['text']}

            topics_assignment = lda_utils.assign_topics_for_query(model_id, args['text'], args['threshold'])
            if topics_assignment is None:
                response = "Error during topic extraction. Check logs."
                response_code = 500
            else:
                list_of_da = lda_utils.convert_topic_assignment_to_dictionary(topics_assignment)
                data['assigned_topics'] = sorted(list_of_da[0]['assigned_topics'], reverse=True, key=lambda d:d['topic_weight'])
                response_code = 200
                response = "Topics extracted."

            marshalled = marshal(data, api_utils.textual_query_fields)
        else:
            response_code = 500
            marshalled = None
            response = "The text field is empty."

        if response_code == 200:
            return api_utils.prepare_success_response(response_code, response, marshalled)
        else:
            return api_utils.prepare_error_response(response_code, response, marshalled)
topics_api.py 文件源码 项目:LDA-REST 作者: valentinarho 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get(self, model_id, topic_id):
        """
        Get all info related to the topic topic_id in model model_id
        :param model_id:
        :param topic_id:
        :return:
        """
        data = db_utils.get_topic(model_id, int(topic_id))
        marshalled = marshal(data, api_utils.topic_fields)
        response = "Topic retrieved."

        return api_utils.prepare_success_response(200, response, marshalled)
domains.py 文件源码 项目:oniongate-web 作者: DonnchaC 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get(self, domain_name=None):
        """
        Return a listing of all public domains resolved by this resolver
        """
        if domain_name:
            # Include DNS records when an individual domain is requested
            return marshal(Domain.get_or_404(domain_name), domain_fields_with_records), 200

        all_domains = Domain.query.filter_by(public=True, deleted=False).all()
        return marshal(all_domains, domain_fields), 200
socketioService.py 文件源码 项目:OctoPrint-Dashboard 作者: meadowfrey 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def on_join(data):
    if current_app.config["AUTH"] == Config.NONE:
        user = User("Gandalf", superadmin=True)
    else:
        token = data.get('jwt')
        if not token:
            disconnect()
            return

        try:
            payload = LoginService.parse_api_token_direct(token)
        except DecodeError:
            disconnect()
            return
        except ExpiredSignature:
            disconnect()
            return
        user = User.query.filter_by(username=payload["username"]).scalar()
    printers = user.get_accessible_printers()

    for printer in printers:
        join_room(str(printer.id))

    datatype = {
        'id': fields.Integer,
        'name': fields.String,
        'group': fields.List(
            fields.Nested({
                'name': fields.String
            })
        )
    }
    emit("printers", marshal(printers, datatype))


问题


面经


文章

微信
公众号

扫码关注公众号