def clean(self):
"""Extended validation for GCE Subnets."""
if not re.match('^(?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)$', self.name):
raise me.ValidationError('A **lowercase** name must be specified')
super(GoogleSubnet, self).clean()
python类ValidationError()的实例源码
def add(self, **kwargs):
"""Add an entry to the database
This is only to be called by `Schedule.add` classmethod to create
a schedule. Fields `owner` and `name` are already populated in
`self.schedule`. The `self.schedule` is not yet saved.
"""
# check if required variables exist.
if not (kwargs.get('script_id', '') or kwargs.get('action', '')):
raise BadRequestError("You must provide script_id "
"or machine's action")
if not kwargs.get('conditions'):
raise BadRequestError("You must provide a list of conditions, "
"at least machine ids or tags")
if kwargs.get('schedule_type') not in ['crontab',
'interval', 'one_off']:
raise BadRequestError('schedule type must be one of these '
'(crontab, interval, one_off)]')
if kwargs.get('schedule_type') == 'one_off' and not kwargs.get(
'schedule_entry', ''):
raise BadRequestError('one_off schedule '
'requires date given in schedule_entry')
try:
self.update(**kwargs)
except (me.ValidationError, me.NotUniqueError) as exc:
# Propagate original error.
log.error("Error adding %s: %s", self.schedule.name,
exc.to_dict())
raise
log.info("Added schedule with name '%s'", self.schedule.name)
self.schedule.owner.mapper.update(self.schedule)
def load_user(user_id):
if user_id not in ['None', None]:
try:
user = umodels.User.objects.filter(id=user_id).first()
return user
except (me.DoesNotExist, me.ValidationError):
app.logger.error("User id \"{}\" does not exist".format(user_id))
return False
def test_get_user(self):
"""Ensure get_user classmethod is returning always what we expect"""
# test: invalid input
self.assertRaises(ValidationError, umodels.User.get_user, None)
# test: UserMixin (from flask.login) object
self.assertEqual(self.user, umodels.User.get_user(self.user))
# test: LocalProxy object
login_user(self.user)
self.assertEqual(self.user, umodels.User.get_user(current_user))
# test: User Object from db but by "username" and by "id"
self.assertEqual(self.user, umodels.User.get_user("username"))
self.assertEqual(self.user, umodels.User.get_user(self.user.id))
def get_user(cls, user):
"""Get user object:
1º check if user is already User, LocalProxy, or AnnonymousUser object
if so, just return it
2º if not, search for this user and return if found.
3º otherwise, return DoesNotExists exception"""
if any([isinstance(user, obj) for obj in [UserMixin,
AnonymousUserMixin]]):
return user
if isinstance(user, LocalProxy):
return user._get_current_object()
try:
return User.objects.get(username=user)
except (me.DoesNotExist, me.ValidationError):
return User.objects.get(id=user)
def user_detail(user_id=None):
""""""
try:
user = umodels.User.objects.get(id=user_id)
except (me.DoesNotExist, me.ValidationError):
abort(404)
return render_template('admin/user/detail.html', **{'user': user})
def get_user(self, identifier):
from mongoengine import ValidationError
try:
return self.user_model.objects(id=identifier).first()
except ValidationError:
pass
for attr in get_identity_attributes():
query_key = '%s__iexact' % attr
query = {query_key: identifier}
rv = self.user_model.objects(**query).first()
if rv is not None:
return rv
def find_user(self, **kwargs):
try:
from mongoengine.queryset import Q, QCombination
except ImportError:
from mongoengine.queryset.visitor import Q, QCombination
from mongoengine.errors import ValidationError
queries = map(lambda i: Q(**{i[0]: i[1]}), kwargs.items())
query = QCombination(QCombination.AND, queries)
try:
return self.user_model.objects(query).first()
except ValidationError: # pragma: no cover
return None
def create_organization(request):
"""
Create organization.
The user creating it will be assigned to the
owners team. For now owner has only org
---
name:
description: The new org name (id)
type: string
required: true
"""
auth_context = auth_context_from_request(request)
user = auth_context.user
# SEC
if not user.can_create_org:
raise OrganizationAuthorizationFailure('Unauthorized to '
'create organization')
params = params_from_request(request)
name = params.get('name')
super_org = params.get('super_org')
if not name:
raise RequiredParameterMissingError()
if Organization.objects(name=name):
raise OrganizationNameExistsError()
org = Organization()
org.add_member_to_team('Owners', user)
org.name = name
# mechanism for sub-org creation
# the owner of super-org has the ability to create a sub-org
if super_org:
org.parent = auth_context.org
try:
org.save()
except me.ValidationError as e:
raise BadRequestError({"msg": e.message, "errors": e.to_dict()})
except me.OperationError:
raise OrganizationOperationError()
trigger_session_update(auth_context.user, ['user'])
return org.as_dict()
def add_team(request):
"""
Create new team.
Append it at org's teams list.
Only available to organization owners.
---
name:
description: The new team name
type: string
required: true
description:
description: The new team description
type: string
"""
log.info("Adding team")
auth_context = auth_context_from_request(request)
org_id = request.matchdict['org_id']
params = params_from_request(request)
name = params.get('name')
description = params.get('description', '')
visibility = params.get('visible', True)
if not name:
raise RequiredParameterMissingError()
# SEC check if owner
if not (auth_context.org and auth_context.is_owner()
and auth_context.org.id == org_id):
raise OrganizationAuthorizationFailure()
team = Team()
team.name = name
team.description = description
team.visible = visibility
auth_context.org.teams.append(team)
try:
auth_context.org.save()
except me.ValidationError as e:
raise BadRequestError({"msg": e.message, "errors": e.to_dict()})
except me.OperationError:
raise TeamOperationError()
log.info("Adding team with name '%s'.", name)
trigger_session_update(auth_context.owner, ['org'])
return team.as_dict()
# SEC
def delete_team(request):
"""
Delete a team entry in the db.
Only available to organization owners.
---
org_id:
description: The team's org id
type: string
required: true
team_id:
description: The team's id
type: string
required: true
"""
auth_context = auth_context_from_request(request)
org_id = request.matchdict['org_id']
team_id = request.matchdict['team_id']
# SEC check if owner
if not (auth_context.org and auth_context.is_owner() and
auth_context.org.id == org_id):
raise OrganizationAuthorizationFailure()
if auth_context.org.get_team('Owners').id == team_id:
raise ForbiddenError()
try:
team = auth_context.org.get_team_by_id(team_id)
except me.DoesNotExist:
raise NotFoundError()
if team.members:
raise BadRequestError(
'Team not empty. Remove all members and try again')
try:
team.drop_mappings()
auth_context.org.update(pull__teams__id=team_id)
except me.ValidationError as e:
raise BadRequestError({"msg": e.message, "errors": e.to_dict()})
except me.OperationError:
raise TeamOperationError()
trigger_session_update(auth_context.owner, ['org'])
return OK
# SEC
def clean(self):
# make sure that each team's name is unique
used = set()
for team in self.teams:
if team.name in used:
raise me.ValidationError("Team name exists.")
used.add(team.name)
# make sure that all team members are also org members
for team in self.teams:
for i, member in enumerate(list(team.members)):
if member not in self.members:
team.members.pop(i)
# make sure that owners team is present
try:
owners = self.teams.get(name='Owners')
except me.DoesNotExist:
raise me.ValidationError("Owners team can't be removed.")
# make sure that owners team is not empty
if not owners.members:
raise me.ValidationError("Owners team can't be empty.")
if HAS_POLICY:
# make sure owners policy allows all permissions
if owners.policy.operator != 'ALLOW':
raise me.ValidationError("Owners policy must be set to ALLOW.")
# make sure owners policy doesn't contain specific rules
if owners.policy.rules:
raise me.ValidationError("Can't set policy rules for Owners.")
# make sure org name is unique - we can't use the unique keyword on the
# field definition because both User and Organization subclass Owner
# but only Organization has a name
if self.name and Organization.objects(name=self.name, id__ne=self.id):
raise me.ValidationError("Organization with name '%s' "
"already exists." % self.name)
self.members_count = len(self.members)
self.teams_count = len(self.teams)
super(Organization, self).clean()