python类marshal()的实例源码

reflector.py 文件源码 项目:floranet 作者: Fluent-networks 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def marshal(self):
        """Get REST API marshalled fields as an orderedDict

        Returns:
            OrderedDict of fields defined by marshal_fields
        """
        marshal_fields = {
            'type': fields.String(attribute='__class__.__name__'),
            'id': fields.Integer(attribute='appinterface.id'),
            'name': fields.String,
            'started': fields.Boolean
        }
        return marshal(self, marshal_fields)
system.py 文件源码 项目:floranet 作者: Fluent-networks 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get(self):
        """Method to handle system configuration GET requests"""
        try:
            config = yield Config.find(limit=1)
            # Return a 404 if not found.
            if config is None:
                abort(404, message={'error': "Could not get the system configuration"})
            returnValue(marshal(config, self.fields))

        except TimeoutError:
            log.error("REST API timeout retrieving application {appeui}",
                      appeui=euiString(appeui))
appproperty.py 文件源码 项目:floranet 作者: Fluent-networks 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get(self, appeui):
        """Method to handle application property GET requests

        Args:
            appeui (int): Application EUI
            port (int): Application property port
        """
        try:
            app = yield Application.find(where=['appeui = ?', appeui], limit=1)
            # Return a 404 if not found.
            if app is None:
                abort(404, message={'error': "Application {} doesn't exist."
                                    .format(euiString(appeui))})

            port = self.args['port']
            p = yield AppProperty.find(where=['application_id = ? AND port = ?',
                                             app.id, port])
            if p is None:
                 abort(404, message={'error': "Application property doesn't exist."})

            data = marshal(p, self.fields)
            returnValue(data)

        except TimeoutError:
            log.error("REST API timeout get request for application {appeui} "
                      "property {port}", appeui=euiString(appeui), port=port)
application.py 文件源码 项目:floranet 作者: Fluent-networks 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def getProperties(self, app):
        """Get and marshal the application properties"""

        # Get the properties
        props = yield app.properties.get()

        # Marshal
        data = {}
        for i,p in enumerate(props):
            data[i] = marshal(p, self.pfields)

        returnValue(data)
application.py 文件源码 项目:floranet 作者: Fluent-networks 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get(self):
        """Method to get all applications"""
        try:
            apps = yield Application.all()
            if apps is None:
                returnValue({})
            data = {}
            for i,a in enumerate(apps):
                data[i] = marshal(a, self.fields)
                data[i]['properties'] = yield self.getProperties(a)
            returnValue(data)

        except TimeoutError:
            log.error("REST API timeout retrieving application {appeui}",
                      appeui=euiString(appeui))
gateway.py 文件源码 项目:floranet 作者: Fluent-networks 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get(self, host):
        """Method to handle gateway GET requests"""  
        try:
            g = yield Gateway.find(where=['host = ?', host], limit=1)
            # Return a 404 if not found.
            if g is None:
                abort(404, message={'error': "Gateway {} doesn't exist.".format(host)})
            returnValue(marshal(g, self.fields))

        except TimeoutError:
            log.error("REST API timeout retrieving gateway {host}",
                      host=host)
gateway.py 文件源码 项目:floranet 作者: Fluent-networks 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get(self):
        """Method to get all gateways"""
        try:
            gateways = yield Gateway.all()
            if gateways is None:
                returnValue({})
            data = {}
            for i,g in enumerate(gateways):
                data[i] = marshal(g, self.fields)
            returnValue(data)

        except TimeoutError:
            # Exception returns 500 to client
            log.error("REST API timeout retrieving all gateways")
managers.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get(self, org_id, location_id):
        """ List all managers in this location"""
        response = {
            API_ENVELOPE: [],
        }

        location = Location.query.get_or_404(location_id)
        response[API_ENVELOPE] = map(
            lambda manager: marshal(manager, user_fields),
            location.managers.all())

        return response
manager.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get(self, org_id, location_id, user_id):
        user = User.query.get_or_404(user_id)

        if not user.is_location_manager(location_id):
            return {"message": "User does not exist or not a manager"}, 404

        response = {
            API_ENVELOPE: marshal(user, user_fields),
            "resources": [],
        }

        return response
location.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get(self, org_id, location_id):
        response = {
            API_ENVELOPE: {},
            "resources": [
                "attendance", "managers", "shifts", "timeclocks",
                "timeoffrequests", "roles"
            ],
        }

        parser = reqparse.RequestParser()
        parser.add_argument("recurse", type=inputs.boolean, default=False)
        parser.add_argument("archived", type=inputs.boolean)
        args = parser.parse_args()
        args = dict((k, v) for k, v in args.iteritems() if v is not None)

        location = Location.query.get_or_404(location_id)
        response[API_ENVELOPE] = marshal(location, location_fields)

        if args["recurse"]:
            roles_query = Role.query.filter_by(location_id=location_id)

            if "archived" in args:
                roles_query = roles_query.filter_by(archived=args["archived"])

            roles = roles_query.all()
            response[API_ENVELOPE].update({
                "roles":
                map(lambda role: marshal(role, role_fields), roles)
            })

            # also include managers for the location
            managers = User.query.join(User.manager_of).filter(
                Location.id == location_id).all()
            response[API_ENVELOPE].update({
                "managers":
                map(lambda manager: marshal(manager, user_fields), managers)
            })

        return response
role.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get(self, org_id, location_id, role_id):
        response = {
            API_ENVELOPE: {},
            "resources": ["recurringshifts", "schedules", "shifts", "users"],
        }

        parser = reqparse.RequestParser()
        parser.add_argument("recurse", type=inputs.boolean, default=False)
        parser.add_argument("archived", type=inputs.boolean)
        args = parser.parse_args()
        args = dict((k, v) for k, v in args.iteritems() if v is not None)

        role = Role.query.get_or_404(role_id)
        response[API_ENVELOPE] = marshal(role, role_fields)

        if args["recurse"]:
            rtu_query = RoleToUser.query.filter_by(role_id=role_id)

            if "archived" in args:
                rtu_query = rtu_query.filter_by(archived=args["archived"])

            members = rtu_query.all()
            memberships = []

            for member in members:
                rtu = marshal(member, role_to_user_fields)
                rtu.update(marshal(member.user, user_fields))
                memberships.append(rtu)

            response[API_ENVELOPE].update({"users": memberships})

        return response
recurring_shifts.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get(self, org_id, location_id, role_id):
        """
        get recurring shifts for a role. can optionally filter by user_id
        """

        parser = reqparse.RequestParser()
        parser.add_argument("user_id", type=int)
        parameters = parser.parse_args()

        # Filter out null values
        parameters = dict((k, v) for k, v in parameters.iteritems()
                          if v is not None)

        recurring_shifts_query = RecurringShift.query.filter_by(
            role_id=role_id)

        if "user_id" in parameters:
            user_id = None if parameters[
                "user_id"] == constants.UNASSIGNED_USER_ID else parameters[
                    "user_id"]

            recurring_shifts_query = recurring_shifts_query.filter_by(
                user_id=user_id)

        return {
            constants.API_ENVELOPE:
            map(lambda recurring_shift: marshal(recurring_shift, recurring_shift_fields),
                recurring_shifts_query.all())
        }
timeoffrequest.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get(self, org_id, location_id, role_id, user_id, time_off_request_id):
        """
        returns a specific time off request record
        """

        time_off_request = TimeOffRequest.query.get_or_404(time_off_request_id)

        return {
            API_ENVELOPE: marshal(time_off_request, time_off_request_fields),
            "resources": [],
        }
users.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get(self, org_id, location_id, role_id):
        """ List all members in this role """

        parser = reqparse.RequestParser()
        parser.add_argument("archived", type=inputs.boolean)
        parameters = parser.parse_args(strict=True)
        parameters = dict((k, v) for k, v in parameters.iteritems()
                          if v is not None)

        response = {
            API_ENVELOPE: [],
        }
        users_query = RoleToUser.query.filter_by(role_id=role_id)

        # by default, only include active users
        if "archived" in parameters:
            users_query = users_query.filter_by(
                archived=parameters["archived"])

        members = users_query.all()

        for member in members:
            datum = {}
            datum.update(marshal(member.user, user_fields))
            datum.update(marshal(member, role_to_user_fields))
            response[API_ENVELOPE].append(datum)

        return response
schedule.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get(self, org_id, location_id, role_id, schedule_id):

        response = {
            API_ENVELOPE: {},
            "resources":
            ["preferences", "shifts", "timeclocks", "timeoffrequests"],
        }

        schedule = Schedule2.query.get_or_404(schedule_id)
        schedule = marshal(schedule, schedule_fields)

        response[API_ENVELOPE] = schedule

        return response
timeoffrequests.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get(self, org_id, location_id, role_id, schedule_id):
        """
        returns all time off request data that correlates to the timespan of a given schedule
        """

        parser = reqparse.RequestParser()
        parser.add_argument("user_id", type=int)
        parameters = parser.parse_args()

        # Filter out null values
        parameters = dict((k, v) for k, v in parameters.iteritems()
                          if v is not None)

        # get schedule object
        schedule = Schedule2.query.get_or_404(schedule_id)

        # prepare query
        time_off_requests = TimeOffRequest.query \
            .join(RoleToUser) \
            .filter(
                RoleToUser.role_id == role_id,
                TimeOffRequest.start >= schedule.start,
                TimeOffRequest.start < schedule.stop
            )

        # add user id if optionally added
        if "user_id" in parameters:
            time_off_requests = time_off_requests\
                .filter(
                   RoleToUser.user_id == parameters.get("user_id")
                )

        return {
            API_ENVELOPE:
            map(lambda time_off_request: marshal(time_off_request, time_off_request_fields),
                time_off_requests.all())
        }
timeclocks.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get(self, org_id, location_id, role_id, schedule_id):
        """
        returns all timeclock data that correlates to the timespan of a given schedule
        """

        parser = reqparse.RequestParser()
        parser.add_argument("user_id", type=int)
        parameters = parser.parse_args()

        # Filter out null values
        parameters = dict((k, v) for k, v in parameters.iteritems()
                          if v is not None)

        # get schedule object
        schedule = Schedule2.query.get_or_404(schedule_id)

        # prepare query
        timeclocks = Timeclock.query \
            .filter_by(role_id=role_id) \
            .filter(Timeclock.start >= schedule.start) \
            .filter(Timeclock.start < schedule.stop)

        # add user id if optionally added
        if "user_id" in parameters:
            timeclocks = timeclocks.filter_by(
                user_id=parameters.get("user_id"))

        return {
            API_ENVELOPE:
            map(lambda timeclock: marshal(timeclock, timeclock_fields),
                timeclocks.all())
        }
preferences.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get(self, org_id, location_id, role_id, schedule_id):
        response = {
            API_ENVELOPE: [],
        }

        preferences = Preference.query.filter_by(schedule_id=schedule_id).all()
        response[API_ENVELOPE] = map(
            lambda preference: marshal(preference, preference_fields),
            preferences)

        return response
locations.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def post(self, org_id):
        parser = reqparse.RequestParser()
        parser.add_argument("name", type=str)
        parser.add_argument(
            "timezone",
            type=str,
            default=current_app.config.get("DEFAULT_TIMEZONE"))
        parameters = parser.parse_args(strict=True)

        timezone = parameters.get("timezone")

        try:
            pytz.timezone(timezone)
        except pytz.exceptions.UnknownTimeZoneError as zone:
            return {"message": "Unknown timezone specified: %s" % zone}, 400

        l = Location(
            name=parameters.get("name"),
            organization_id=org_id,
            timezone=timezone)
        db.session.add(l)
        try:
            db.session.commit()
            return marshal(l, location_fields), 201
        except Exception as exception:
            db.session.rollback()
            current_app.logger.exception(str(exception))
            abort(400)
admins.py 文件源码 项目:suite 作者: Staffjoy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get(self, org_id):
        """ List all admins in this organization """
        response = {
            API_ENVELOPE: [],
        }

        org = Organization.query.get_or_404(org_id)
        response[API_ENVELOPE] = map(lambda admin: marshal(admin, user_fields),
                                     org.admins.all())

        return response


问题


面经


文章

微信
公众号

扫码关注公众号