def save_model_action(model_action_data, access, process):
from audit_tools.audit.models import Access
from audit_tools.audit.models.models_factory import create_model_action
try:
logger.debug("Pre save ModelAction")
try:
a = Access.objects.get(id=access.id)
m = create_model_action(model_action_data, a, process)
except AttributeError:
m = create_model_action(model_action_data, None, process)
except ValidationError:
access.save()
a = Access.objects.get(id=access.id)
m = create_model_action(model_action_data, a, process)
m.save()
logger.debug("Post save ModelAction: %s", m.id)
except:
logger.exception("Error saving ModelAction document")
return True
python类ValidationError()的实例源码
def create_zone(self, zone, **kwargs):
"""
This is the public method that is called to create a new DNS zone.
"""
self._create_zone__prepare_args(kwargs)
pr_zone = self._create_zone__for_cloud(**kwargs)
# Set fields to cloud model and perform early validation.
zone.zone_id = pr_zone.id
zone.domain = pr_zone.domain
zone.type = pr_zone.type
zone.ttl = pr_zone.ttl
zone.extra = pr_zone.extra
# Attempt to save.
try:
zone.save()
except me.ValidationError as exc:
log.error("Error updating %s: %s", zone, exc.to_dict())
raise BadRequestError({'msg': exc.message,
'errors': exc.to_dict()})
except me.NotUniqueError as exc:
log.error("Zone %s not unique error: %s", zone, exc)
raise ZoneExistsError()
self.cloud.owner.mapper.update(zone)
def init_mappings(self):
"""Initialize RBAC Mappings.
RBAC Mappings always refer to a (Organization, Team) combination.
In order to reference the Organization instance of this Team, we
are using `self._instance`, which is a proxy object to the upper
level Document.
"""
if not RBACMapping:
return
if self.name == 'Owners':
return
if RBACMapping.objects(org=self._instance.id, team=self.id).only('id'):
raise me.ValidationError(
'RBAC Mappings already initialized for Team %s' % self
)
for perm in ('read', 'read_logs'):
RBACMapping(
org=self._instance.id, team=self.id, permission=perm
).save()
def create_org_for_user(user, org_name='', promo_code=None, token=None,
selected_plan=None):
org = Organization(name=org_name, selected_plan=selected_plan)
org.add_member_to_team('Owners', user)
org.name = org_name
try:
org.save()
except ValidationError as e:
raise BadRequestError({"msg": e.message, "errors": e.to_dict()})
except OperationError:
raise OrganizationOperationError()
# assign promo if applicable
if promo_code or token:
assign_promo(org, promo_code, token)
return org
def update_whitelist_ips(auth_context, ips):
"""
This function takes a list of dicts in the form:
[{cidr:'cidr1', 'description:'desc1'},
{cidr:'cidr2', 'description:'desc2'}]
and saves them in the User.ips field.
"""
user = auth_context.user
user.ips = []
for ip_dict in ips:
wip = WhitelistIP()
wip.cidr = ip_dict['cidr']
wip.description = ip_dict['description']
user.ips.append(wip)
try:
user.save()
except ValidationError as e:
raise BadRequestError({"msg": e.message, "errors": e.to_dict()})
def clean(self):
"""Ensures that self is a valid RSA keypair."""
from Crypto import Random
Random.atfork()
if 'RSA' not in self.private:
raise me.ValidationError("Private key is not a valid RSA key.")
# Generate public key from private key file.
try:
key = RSA.importKey(self.private)
self.public = key.publickey().exportKey('OpenSSH')
except Exception:
log.exception("Error while constructing public key "
"from private.")
raise me.ValidationError("Private key is not a valid RSA key.")
def clean(self):
"""Custom validation for GCE Networks.
GCE enforces:
- Regex constrains on network names.
- CIDR assignment only if `legacy` mode has been selected.
"""
if self.mode == 'legacy':
super(GoogleNetwork, self).clean()
elif self.cidr is not None:
raise me.ValidationError('CIDR cannot be set for modes other than '
'"legacy" - Current mode: %s' % self.mode)
if not re.match('^(?:[a-z](?:[-a-z0-9]{0,61}[a-z0-9])?)$', self.name):
raise me.ValidationError('A **lowercase** name must be specified')
def user_toggle(user_id=None, action=None):
try:
user = umodels.User.get_user(user_id)
if action == "remove":
user.delete()
flash.success("User \"{}\" was successfully deleted!"
"".format(str(user)))
elif action == "is_admin":
user.is_admin = not user.is_admin
user.save()
flash.success("User \"{}\" field \"{}\" was successfully "
"updated to \"{}\"!".format(str(user), action,
user.is_admin))
except (me.DoesNotExist, me.ValidationError) as e:
flash.warning("User with id \"{}\" does not exist."
"".format(user_id))
return redirect(url_for("admin.home"))
def index_stock(stockid):
try:
page = int(request.args.get('page')) if request.args.get('page') else 1
stock = models.Stock.objects.filter(id=stockid, blacklisted=False).first()
if not stock:
return redirect(url_for('index'))
leaders = models.get_leaders()
return render_template('index.html',
view="market",
base_url=settings.SERVER_NAME,
leaders=leaders,
stock=stock,
stocks=api_views.get_paged_stocks(page),
page=page)
except DoesNotExist as e:
return redirect(url_for('index'))
except ValidationError as e:
return redirect(url_for('index'))
def test_save_with_access_not_saved(self, logger, access_klass, create_model_action):
model_action = MagicMock()
model_action.save.return_value = True
create_model_action.return_value = model_action
access = MagicMock()
access.save.return_value = True
access_klass.objects.get.side_effect = (ValidationError, access)
result = tasks.save_model_action(model_action, access, None)
self.assertEqual(access.save.call_count, 1)
self.assertTrue(result)
def get_or_404(cls, *args, **kwargs):
message = kwargs.pop('message', None)
try:
return cls.objects.get(*args, **kwargs)
except (MultipleObjectsReturned, DoesNotExist, ValidationError):
if message is not None:
abort(404, message=message)
abort(404)
def validates(model):
try:
model.validate(clean=True)
except ValidationError:
return False
return True
def validates(model):
try:
model.validate(clean=True)
except ValidationError:
return False
return True
def clean(self):
"""Overriding the default clean method to implement param checking"""
super(ARecord, self).clean()
try:
ip_addr = self.rdata[0].decode('utf-8')
ip.ip_address(ip_addr)
except ValueError:
raise me.ValidationError('IPv4 address provided is not valid')
if not len(self.rdata) == 1:
raise me.ValidationError('We cannot have more than one rdata'
'values for this type of record.')
def clean(self):
"""Overriding the default clean method to implement param checking"""
super(AAAARecord, self).clean()
try:
ip_addr = self.rdata[0].decode('utf-8')
ip.ip_address(ip_addr)
except ValueError:
raise me.ValidationError('IPv6 address provided is not valid')
if not len(self.rdata) == 1:
raise me.ValidationError('We cannot have more than one rdata'
'values for this type of record.')
def clean(self):
"""Overriding the default clean method to implement param checking"""
super(CNAMERecord, self).clean()
if not self.rdata[0].endswith('.'):
self.rdata[0] += '.'
if not len(self.rdata) == 1:
raise me.ValidationError('We cannot have more than one rdata'
'values for this type of record.')
def clean(self):
if not self.tags:
self.tags = {}
elif not isinstance(self.tags, dict):
raise me.ValidationError('Tags must be a dictionary')
def exception_handler_mist(exc, request):
"""
Here we catch exceptions and transform them to proper http responses
This is a special pyramid view that gets triggered whenever an exception
is raised from any other view. It catches all exceptions exc where
isinstance(exc, context) is True.
"""
# mongoengine ValidationError
if isinstance(exc, me.ValidationError):
trace = traceback.format_exc()
log.warning("Uncaught me.ValidationError!\n%s", trace)
return Response("Validation Error", 400)
# mongoengine NotUniqueError
if isinstance(exc, me.NotUniqueError):
trace = traceback.format_exc()
log.warning("Uncaught me.NotUniqueError!\n%s", trace)
return Response("NotUniqueError", 409)
# non-mist exceptions. that shouldn't happen! never!
if not isinstance(exc, MistError):
if not isinstance(exc, (me.ValidationError, me.NotUniqueError)):
trace = traceback.format_exc()
log.critical("Uncaught non-mist exception? WTF!\n%s", trace)
return Response("Internal Server Error", 500)
# mist exceptions are ok.
log.info("MistError: %r", exc)
# if it is a RedirectError, then send an HTTP Redirect
if isinstance(exc, RedirectError):
return HTTPFound(exc.url or '')
# else translate it to HTTP response based on http_code attribute
return Response(str(exc), exc.http_code)
def create_record(self, record, **kwargs):
"""
This is the public method that is called to create a new DNS record
under a specific zone.
"""
record.name = kwargs['name']
record.type = kwargs['type']
if isinstance(kwargs['data'], list):
record.rdata = kwargs['data']
else:
record.rdata = [kwargs['data']]
record.ttl = kwargs['ttl']
try:
record.clean()
except me.ValidationError as exc:
log.error("Error validating %s: %s", record, exc.to_dict())
raise BadRequestError({'msg': exc.message,
'errors': exc.to_dict()})
self._create_record__prepare_args(record.zone, kwargs)
pr_record = self._create_record__for_zone(record.zone, **kwargs)
record.record_id = pr_record.id
# This is not something that should be given by the user, e.g. we
# are only using this to store the ttl, so we should onl save this
# value if it's returned by the provider.
record.extra = pr_record.extra
try:
record.save()
except me.ValidationError as exc:
log.error("Error validating %s: %s", record, exc.to_dict())
raise BadRequestError({'msg': exc.message,
'errors': exc.to_dict()})
except me.NotUniqueError as exc:
log.error("Record %s not unique error: %s", record, exc)
raise RecordExistsError()
self.cloud.owner.mapper.update(record)
def add(self, fail_on_error=True, fail_on_invalid_params=True, **kwargs):
"""Add new Cloud to the database
This is the only cloud controller subclass that overrides the `add`
method of `BaseMainController`.
This is only expected to be called by `Cloud.add` classmethod to create
a cloud. Fields `owner` and `title` are already populated in
`self.cloud`. The `self.cloud` model is not yet saved.
If appropriate kwargs are passed, this can currently also act as a
shortcut to also add the first machine on this dummy cloud.
"""
# Attempt to save.
try:
self.cloud.save()
except me.ValidationError as exc:
raise BadRequestError({'msg': exc.message,
'errors': exc.to_dict()})
except me.NotUniqueError:
raise CloudExistsError("Cloud with name %s already exists"
% self.cloud.title)
# Add machine.
if kwargs:
try:
self.add_machine_wrapper(
self.cloud.title, fail_on_error=fail_on_error,
fail_on_invalid_params=fail_on_invalid_params, **kwargs
)
except Exception as exc:
if fail_on_error:
self.cloud.delete()
raise
def clean(self):
"""Checks the CIDR to determine if it maps to a valid IPv4 network."""
try:
self.cidr = str(netaddr.IPNetwork(self.cidr))
except (TypeError, netaddr.AddrFormatError) as err:
raise me.ValidationError(err)
def clean(self):
# make sure user.email is unique - we can't use the unique keyword on
# the field definition because both User and Organization subclass
# Owner but only user has an email field
if User.objects(email=self.email, id__ne=self.id):
raise me.ValidationError("User with email '%s' already exists."
% self.email)
super(User, self).clean()
def update(self, fail_on_error=True, **kwargs):
for key, value in kwargs.iteritems():
if key not in type(self)._fields:
if not fail_on_error:
continue
raise me.ValidationError('Field "%s" does not exist on %s',
key, type(self))
setattr(self, key, value)
def clean(self):
if self.timedelta.total_seconds() < 60:
raise me.ValidationError("%s's timedelta cannot be less than "
"a minute", self.__class__.__name__)
def clean(self):
# FIXME This is needed in order to ensure rule name convention remains
# backwards compatible with the old monitoring stack. However, it will
# have to change in the future due to uniqueness constrains.
if not self.title:
self.title = 'rule%d' % self.owner.rule_counter
# FIXME Ensure a single query condition is specified for backwards
# compatibility with the existing monitoring/alert stack.
if not len(self.queries) is 1:
raise me.ValidationError()
def update(self, fail_on_error=True, **kwargs):
for key, value in kwargs.iteritems():
if key not in self._fields:
if not fail_on_error:
continue
raise me.ValidationError('Field "%s" does not exist on %s',
key, type(self))
setattr(self, key, value)
def add(self, fail_on_error=True, **kwargs):
"""Add a new Rule.
This method is meant to be invoked only by the `Rule.add` classmethod.
"""
for field in ('queries', ): # TODO 'window', 'interval', ):
if field not in kwargs:
raise RequiredParameterMissingError(field)
try:
self.update(fail_on_error=fail_on_error, **kwargs)
except (me.ValidationError, BadRequestError) as err:
log.error('Error adding %s: %s', self.rule.name, err)
raise
def clean(self):
"""Extended validation for EC2 Networks to ensure CIDR assignment."""
if not self.cidr:
raise me.ValidationError('Missing IPv4 range in CIDR notation')
super(AmazonNetwork, self).clean()
def clean(self):
"""Extended validation for EC2 Networks to ensure CIDR assignment."""
if not self.cidr:
raise me.ValidationError('Missing IPv4 range in CIDR notation')
super(AzureArmNetwork, self).clean()
def clean(self):
"""Checks the CIDR to determine if it maps to a valid IPv4 network."""
try:
netaddr.cidr_to_glob(self.cidr)
except (TypeError, netaddr.AddrFormatError) as err:
raise me.ValidationError(err)