python类ValidationError()的实例源码

models.py 文件源码 项目:mist.api 作者: mistio 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def clean(self):
        """Extended validation for GCE Subnets."""
        if not re.match('^(?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)$', self.name):
            raise me.ValidationError('A **lowercase** name must be specified')
        super(GoogleSubnet, self).clean()
base.py 文件源码 项目:mist.api 作者: mistio 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def add(self, **kwargs):
        """Add an entry to the database

        This is only to be called by `Schedule.add` classmethod to create
        a schedule. Fields `owner` and `name` are already populated in
        `self.schedule`. The `self.schedule` is not yet saved.

        """

        # check if required variables exist.
        if not (kwargs.get('script_id', '') or kwargs.get('action', '')):
            raise BadRequestError("You must provide script_id "
                                  "or machine's action")

        if not kwargs.get('conditions'):
            raise BadRequestError("You must provide a list of conditions, "
                                  "at least machine ids or tags")

        if kwargs.get('schedule_type') not in ['crontab',
                                               'interval', 'one_off']:
            raise BadRequestError('schedule type must be one of these '
                                  '(crontab, interval, one_off)]')

        if kwargs.get('schedule_type') == 'one_off' and not kwargs.get(
                'schedule_entry', ''):
            raise BadRequestError('one_off schedule '
                                  'requires date given in schedule_entry')

        try:
            self.update(**kwargs)
        except (me.ValidationError, me.NotUniqueError) as exc:
            # Propagate original error.
            log.error("Error adding %s: %s", self.schedule.name,
                      exc.to_dict())
            raise
        log.info("Added schedule with name '%s'", self.schedule.name)
        self.schedule.owner.mapper.update(self.schedule)
login_manager.py 文件源码 项目:flask-template-project 作者: andreffs18 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def load_user(user_id):
    if user_id not in ['None', None]:
        try:
            user = umodels.User.objects.filter(id=user_id).first()
            return user
        except (me.DoesNotExist, me.ValidationError):
            app.logger.error("User id \"{}\" does not exist".format(user_id))
    return False
test_models.py 文件源码 项目:flask-template-project 作者: andreffs18 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_get_user(self):
        """Ensure get_user classmethod is returning always what we expect"""
        # test: invalid input
        self.assertRaises(ValidationError, umodels.User.get_user, None)
        # test: UserMixin (from flask.login) object
        self.assertEqual(self.user, umodels.User.get_user(self.user))
        # test: LocalProxy object
        login_user(self.user)
        self.assertEqual(self.user, umodels.User.get_user(current_user))
        # test: User Object from db but by "username" and by "id"
        self.assertEqual(self.user, umodels.User.get_user("username"))
        self.assertEqual(self.user, umodels.User.get_user(self.user.id))
models.py 文件源码 项目:flask-template-project 作者: andreffs18 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_user(cls, user):
        """Get user object:
        1º check if user is already User, LocalProxy, or AnnonymousUser object
            if so, just return it
        2º if not, search for this user and return if found.
        3º otherwise, return DoesNotExists exception"""
        if any([isinstance(user, obj) for obj in [UserMixin,
                                                  AnonymousUserMixin]]):
            return user
        if isinstance(user, LocalProxy):
            return user._get_current_object()
        try:
            return User.objects.get(username=user)
        except (me.DoesNotExist, me.ValidationError):
            return User.objects.get(id=user)
views.py 文件源码 项目:flask-template-project 作者: andreffs18 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def user_detail(user_id=None):
    """"""
    try:
        user = umodels.User.objects.get(id=user_id)
    except (me.DoesNotExist, me.ValidationError):
        abort(404)

    return render_template('admin/user/detail.html', **{'user': user})
datastore.py 文件源码 项目:python-flask-security 作者: weinbergdavid 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_user(self, identifier):
        from mongoengine import ValidationError
        try:
            return self.user_model.objects(id=identifier).first()
        except ValidationError:
            pass
        for attr in get_identity_attributes():
            query_key = '%s__iexact' % attr
            query = {query_key: identifier}
            rv = self.user_model.objects(**query).first()
            if rv is not None:
                return rv
datastore.py 文件源码 项目:python-flask-security 作者: weinbergdavid 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def find_user(self, **kwargs):
        try:
            from mongoengine.queryset import Q, QCombination
        except ImportError:
            from mongoengine.queryset.visitor import Q, QCombination
        from mongoengine.errors import ValidationError

        queries = map(lambda i: Q(**{i[0]: i[1]}), kwargs.items())
        query = QCombination(QCombination.AND, queries)
        try:
            return self.user_model.objects(query).first()
        except ValidationError:  # pragma: no cover
            return None
views.py 文件源码 项目:mist.api 作者: mistio 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def create_organization(request):
    """
    Create organization.
    The user creating it will be assigned to the
    owners team. For now owner has only org
    ---
    name:
      description: The new org  name (id)
      type: string
      required: true
    """
    auth_context = auth_context_from_request(request)

    user = auth_context.user
    # SEC
    if not user.can_create_org:
        raise OrganizationAuthorizationFailure('Unauthorized to '
                                               'create organization')
    params = params_from_request(request)

    name = params.get('name')
    super_org = params.get('super_org')

    if not name:
        raise RequiredParameterMissingError()
    if Organization.objects(name=name):
        raise OrganizationNameExistsError()

    org = Organization()
    org.add_member_to_team('Owners', user)
    org.name = name

    # mechanism for sub-org creation
    # the owner of super-org has the ability to create a sub-org
    if super_org:
        org.parent = auth_context.org

    try:
        org.save()
    except me.ValidationError as e:
        raise BadRequestError({"msg": e.message, "errors": e.to_dict()})
    except me.OperationError:
        raise OrganizationOperationError()

    trigger_session_update(auth_context.user, ['user'])
    return org.as_dict()
views.py 文件源码 项目:mist.api 作者: mistio 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def add_team(request):
    """
    Create new team.
    Append it at org's teams list.
    Only available to organization owners.
    ---
    name:
      description: The new team name
      type: string
      required: true
    description:
      description: The new team description
      type: string
    """

    log.info("Adding team")

    auth_context = auth_context_from_request(request)
    org_id = request.matchdict['org_id']

    params = params_from_request(request)
    name = params.get('name')
    description = params.get('description', '')
    visibility = params.get('visible', True)

    if not name:
        raise RequiredParameterMissingError()

    # SEC check if owner
    if not (auth_context.org and auth_context.is_owner()
            and auth_context.org.id == org_id):
        raise OrganizationAuthorizationFailure()

    team = Team()
    team.name = name
    team.description = description
    team.visible = visibility
    auth_context.org.teams.append(team)

    try:
        auth_context.org.save()
    except me.ValidationError as e:
        raise BadRequestError({"msg": e.message, "errors": e.to_dict()})
    except me.OperationError:
        raise TeamOperationError()

    log.info("Adding team with name '%s'.", name)
    trigger_session_update(auth_context.owner, ['org'])

    return team.as_dict()


# SEC
views.py 文件源码 项目:mist.api 作者: mistio 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def delete_team(request):
    """
    Delete a team entry in the db.
    Only available to organization owners.
    ---
    org_id:
      description: The team's org id
      type: string
      required: true
    team_id:
      description: The team's id
      type: string
      required: true
    """
    auth_context = auth_context_from_request(request)
    org_id = request.matchdict['org_id']
    team_id = request.matchdict['team_id']

    # SEC check if owner
    if not (auth_context.org and auth_context.is_owner() and
            auth_context.org.id == org_id):
        raise OrganizationAuthorizationFailure()

    if auth_context.org.get_team('Owners').id == team_id:
        raise ForbiddenError()

    try:
        team = auth_context.org.get_team_by_id(team_id)
    except me.DoesNotExist:
        raise NotFoundError()

    if team.members:
        raise BadRequestError(
            'Team not empty. Remove all members and try again')

    try:
        team.drop_mappings()
        auth_context.org.update(pull__teams__id=team_id)
    except me.ValidationError as e:
        raise BadRequestError({"msg": e.message, "errors": e.to_dict()})
    except me.OperationError:
        raise TeamOperationError()

    trigger_session_update(auth_context.owner, ['org'])

    return OK


# SEC
models.py 文件源码 项目:mist.api 作者: mistio 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def clean(self):
        # make sure that each team's name is unique
        used = set()
        for team in self.teams:
            if team.name in used:
                raise me.ValidationError("Team name exists.")
            used.add(team.name)

        # make sure that all team members are also org members
        for team in self.teams:
            for i, member in enumerate(list(team.members)):
                if member not in self.members:
                    team.members.pop(i)

        # make sure that owners team is present
        try:
            owners = self.teams.get(name='Owners')
        except me.DoesNotExist:
            raise me.ValidationError("Owners team can't be removed.")

        # make sure that owners team is not empty
        if not owners.members:
            raise me.ValidationError("Owners team can't be empty.")

        if HAS_POLICY:
            # make sure owners policy allows all permissions
            if owners.policy.operator != 'ALLOW':
                raise me.ValidationError("Owners policy must be set to ALLOW.")

            # make sure owners policy doesn't contain specific rules
            if owners.policy.rules:
                raise me.ValidationError("Can't set policy rules for Owners.")

        # make sure org name is unique - we can't use the unique keyword on the
        # field definition because both User and Organization subclass Owner
        # but only Organization has a name
        if self.name and Organization.objects(name=self.name, id__ne=self.id):
            raise me.ValidationError("Organization with name '%s' "
                                     "already exists." % self.name)

        self.members_count = len(self.members)
        self.teams_count = len(self.teams)
        super(Organization, self).clean()


问题


面经


文章

微信
公众号

扫码关注公众号