def validate(self, data):
if self.instance and (now() - self.instance.created_at) > settings.PARKKIHUBI_TIME_PARKINGS_EDITABLE:
if set(data.keys()) != {'time_end'}:
raise ParkingException(
_('Grace period has passed. Only "time_end" can be updated via PATCH.'),
code='grace_period_over',
)
if self.instance:
# a partial update might be missing one or both of the time fields
time_start = data.get('time_start', self.instance.time_start)
time_end = data.get('time_end', self.instance.time_end)
else:
time_start = data['time_start']
time_end = data['time_end']
if time_end is not None and time_start > time_end:
raise serializers.ValidationError(_('"time_start" cannot be after "time_end".'))
return data
python类ValidationError()的实例源码
def post(self, request, *args, **kwargs):
"""Login a user given a username password combination
Args:
request (rest_framework.request.Request)
"""
email = request.data.get('email', None)
password = request.data.get('password', None)
if not all([email, password]):
raise serializers.ValidationError({'error': 'email and/or password not provided'})
user = authenticate(email=email, password=password)
if user is not None:
login(request, user)
return Response(PFBUserSerializer(user).data)
else:
return Response({
'detail': 'Unable to login with provided username/password'
}, status=status.HTTP_401_UNAUTHORIZED)
def set_password(self, request, pk=None):
"""Detail ``POST`` endpoint for changing a user's password
Args:
request (rest_framework.request.Request)
pk (str): primary key for user to retrieve user from database
Returns:
Response
"""
old_password = request.data.get('oldPassword')
user = authenticate(email=PFBUser.objects.get(uuid=pk).email,
password=old_password)
if not user:
raise ValidationError({'detail': 'Unable to complete password change'})
new_password = request.data.get('newPassword')
if not new_password:
raise ValidationError({'detail': 'Unable to complete password change'})
user.set_password(new_password)
user.save()
return Response({'detail': 'Successfully changed password'},
status=status.HTTP_200_OK)
def validate(self, attrs):
username = attrs.get('username')
password = attrs.get('password')
if username and password:
user = authenticate(username=username, password=password)
if user:
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
else:
msg = _('Unable to log in with provided credentials.')
raise serializers.ValidationError(msg)
else:
msg = _('Must include "username" and "password".')
raise serializers.ValidationError(msg)
attrs['user'] = user
return attrs
def _check_current_version(self, instance):
"""
Check that the edited_version parameter matches the current version.
If different, it indicates a probably "edit conflict": the submitted
edit is being made to a stale version of the model.
"""
edited_version = self.context['request'].data['edited_version']
current_version = reversion.get_for_date(
instance, timezone.now()).id
if not current_version == edited_version:
raise serializers.ValidationError(detail={
'detail':
'Edit conflict error! The current version for this object '
'does not match the reported version being edited.',
'current_version': current_version,
'submitted_data': self.context['request'].data,
})
def _check_tag_data(self, instance, validated_data):
tag_data = validated_data['tags']
# For PUT, check that special tags are retained and unchanged.
if not self.partial:
for tag in instance.special_tags:
if tag in instance.tags and tag not in tag_data:
raise serializers.ValidationError(detail={
'detail': 'PUT requests must retain all special tags. '
'Your request is missing the tag: {}'.format(tag)})
# Check that special tags are unchanged.
for tag in instance.special_tags:
if (tag in instance.tags and tag in tag_data and
tag_data[tag] != instance.tags[tag]):
raise serializers.ValidationError(detail={
'detail': 'Updates (PUT or PATCH) must not attempt '
'to change the values for special tags. Your request '
'attempts to change the value for tag '
"'{}' from '{}' to '{}'".format(
tag, instance.tags[tag], tag_data[tag])})
return tag_data
def create(self, validated_data):
"""
Check that all required tags are included in tag data before creating.
"""
if ['tags', 'variant'] != sorted(validated_data.keys()):
raise serializers.ValidationError(detail={
'detail': "Create (POST) should include the 'tags' and "
"'variant' fields. Your request contains the following "
'fields: {}'.format(str(validated_data.keys()))})
if 'tags' in validated_data:
for tag in Relation.required_tags:
if tag not in validated_data['tags']:
raise serializers.ValidationError(detail={
'detail': 'Create (POST) tag data must include all '
'required tags: {}'.format(Relation.required_tags)})
return super(RelationSerializer, self).create(validated_data)
def validate_tags(self, tags_dict_list):
tags_creators = []
has_errors = False
errors = []
for tag_dict in tags_dict_list:
if not tag_dict.get('id'):
serializer = TagSerializer(data=tag_dict)
else:
instance = Tag.objects.get(id=tag_dict.get('id'))
serializer = TagSerializer(instance, data=tag_dict)
if serializer.is_valid():
tags_creators.append(lambda: serializer.save())
errors.append({})
else:
has_errors = True
errors.append(serializer.errors)
if has_errors:
raise serializers.ValidationError(errors)
return tags_creators
def validate_molecules(self, value):
if not value:
raise serializers.ValidationError(_('You must specify at least 1 molecule'))
# Set the relative priority of molecules in order
for i, molecule in enumerate(value):
molecule['priority'] = i + 1
# Make sure each molecule has the same instrument name
if len(set(molecule['instrument_name'] for molecule in value)) > 1:
raise serializers.ValidationError(_('Each Molecule must specify the same instrument name'))
if sum([mol.get('fill_window', False) for mol in value]) > 1:
raise serializers.ValidationError(_('Only one molecule can have `fill_window` set'))
return value
def validate(self, attrs):
username = attrs.get('username')
password = attrs.get('password')
if username and password:
user = authenticate(username=username, password=password)
if user:
# From Django 1.10 onwards the `authenticate` call simply
# returns `None` for is_active=False users.
# (Assuming the default `ModelBackend` authentication backend.)
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg, code='authorization')
else:
msg = _('Unable to log in with provided credentials.')
raise serializers.ValidationError(msg, code='authorization')
else:
msg = _('Must include "username" and "password".')
raise serializers.ValidationError(msg, code='authorization')
attrs['user'] = user
return attrs
def validate(self, data):
datasource = self.initial_data
# Convert incoming string dates into date objects for validation
for date_key in ["cycle_end_date", "cycle_deadline_date", "cycle_start_date"]:
date = datasource.get(date_key, None)
if date:
datasource[date_key] = datetime.strptime(date, '%Y-%m-%d').date()
# Update our current data if we have any with new data
if self.instance:
instance_data = self.instance.__dict__
instance_data.update(datasource)
datasource = instance_data
# Validate our dates are in a chronologically sound order
if datasource.get("cycle_end_date") < datasource.get("cycle_start_date"):
raise serializers.ValidationError("Cycle start date must be before cycle end date")
if datasource.get("cycle_end_date") < datasource.get("cycle_deadline_date"):
raise serializers.ValidationError("Cycle deadline date must be on or before the cycle end date")
if datasource.get("cycle_deadline_date") < datasource.get("cycle_start_date"):
raise serializers.ValidationError("Cycle deadline date must be after cycle start date")
return data
def validate(self, data):
"""
Object level validation, all descendants should call this via Super()
if it is overriden.
Validates that only fields available in writable_fields are being written
"""
# Data may be stripped of invalid fields before it gets here, check
if len(data.keys()) == 0:
raise serializers.ValidationError("Invalid data")
# Get a list of writable fields
writable_fields = self.get_writable_fields()
invalid_fields = [x for x in data.keys() if x not in writable_fields]
if len(invalid_fields) > 0:
raise serializers.ValidationError(f"The following fields are not writable: {', '.join(invalid_fields)}")
return data
def validate_cpu(self, value):
for k, v in value.viewitems():
if v is None: # use NoneType to unset a value
continue
if not re.match(PROCTYPE_MATCH, k):
raise serializers.ValidationError("Process types can only contain [a-z]")
shares = re.match(CPUSHARE_MATCH, str(v))
if not shares:
raise serializers.ValidationError("CPU shares must be an integer")
for v in shares.groupdict().viewvalues():
try:
i = int(v)
except ValueError:
raise serializers.ValidationError("CPU shares must be an integer")
if i > 1024 or i < 0:
raise serializers.ValidationError("CPU shares must be between 0 and 1024")
return value
def validate_domain(self, value):
"""
Check that the hostname is valid
"""
if len(value) > 255:
raise serializers.ValidationError('Hostname must be 255 characters or less.')
if value[-1:] == ".":
value = value[:-1] # strip exactly one dot from the right, if present
labels = value.split('.')
if 'xip.io' in value:
return value
if labels[0] == '*':
raise serializers.ValidationError(
'Adding a wildcard subdomain is currently not supported.')
allowed = re.compile("^(?!-)[a-z0-9-]{1,63}(?<!-)$", re.IGNORECASE)
for label in labels:
match = allowed.match(label)
if not match or '--' in label or label.isdigit() or \
len(labels) == 1 and any(char.isdigit() for char in label):
raise serializers.ValidationError('Hostname does not look valid.')
if models.Domain.objects.filter(domain=value).exists():
raise serializers.ValidationError(
"The domain {} is already in use by another app".format(value))
return value
def validate_cpu(self, value):
for k, v in value.viewitems():
if v is None: # use NoneType to unset a value
continue
if not re.match(PROCTYPE_MATCH, k):
raise serializers.ValidationError("Process types can only contain [a-z]")
shares = re.match(CPUSHARE_MATCH, str(v))
if not shares:
raise serializers.ValidationError("CPU shares must be an integer")
for v in shares.groupdict().viewvalues():
try:
i = int(v)
except ValueError:
raise serializers.ValidationError("CPU shares must be an integer")
if i > 1024 or i < 0:
raise serializers.ValidationError("CPU shares must be between 0 and 1024")
return value
def validate_domain(self, value):
"""
Check that the hostname is valid
"""
if len(value) > 255:
raise serializers.ValidationError('Hostname must be 255 characters or less.')
if value[-1:] == ".":
value = value[:-1] # strip exactly one dot from the right, if present
labels = value.split('.')
if 'xip.io' in value:
return value
if labels[0] == '*':
raise serializers.ValidationError(
'Adding a wildcard subdomain is currently not supported.')
allowed = re.compile("^(?!-)[a-z0-9-]{1,63}(?<!-)$", re.IGNORECASE)
for label in labels:
match = allowed.match(label)
if not match or '--' in label or label.isdigit() or \
len(labels) == 1 and any(char.isdigit() for char in label):
raise serializers.ValidationError('Hostname does not look valid.')
if models.Domain.objects.filter(domain=value).exists():
raise serializers.ValidationError(
"The domain {} is already in use by another app".format(value))
return value
def validate(self, validated_data):
rehive = Rehive(validated_data.get('token'))
try:
user = rehive.user.get()
groups = [g['name'] for g in user['permission_groups']]
if len(set(["admin", "service"]).intersection(groups)) <= 0:
raise serializers.ValidationError(
{"token": ["Invalid admin user."]})
except APIException:
raise serializers.ValidationError({"token": ["Invalid user."]})
try:
company = rehive.admin.company.get()
except APIException:
raise serializers.ValidationError({"token": ["Invalid company."]})
if Company.objects.filter(identifier=company['identifier']).exists():
raise serializers.ValidationError(
{"token": ["Company already activated."]})
validated_data['user'] = user
validated_data['company'] = company
return validated_data
def validate(self, validated_data):
rehive = Rehive(validated_data.get('token'))
try:
user = rehive.user.get()
groups = [g['name'] for g in user['permission_groups']]
if len(set(["admin", "service"]).intersection(groups)) <= 0:
raise serializers.ValidationError(
{"token": ["Invalid admin user."]})
except APIException:
raise serializers.ValidationError({"token": ["Invalid user."]})
try:
validated_data['company'] = Company.objects.get(
identifier=user['company'])
except Company.DoesNotExist:
raise serializers.ValidationError(
{"token": ["Company has not been activated yet."]})
return validated_data
def create(self, validated_data):
company = self.context['request'].user.company
log_id = self.context.get('view').kwargs.get('log_id')
recipient = validated_data.get('recipient')
try:
log = NotificationLog.objects.get(notification__company=company,
id=log_id)
except NotificationLog.DoesNotExist:
raise exceptions.NotFound()
try:
log.trigger(recipient=recipient)
except Exception as exc:
raise serializers.ValidationError(
{"non_field_errors": ["Internal server error."]})
return log
user.py 文件源码
项目:django-open-volunteering-platform
作者: OpenVolunteeringPlatform
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def validate(self, data):
errors = dict()
if data.get('password'):
password = data.get('password', '')
try:
validate_password(password=password)
except ValidationError as e:
errors['password'] = list(e.messages)
if data.get('email'):
email = data.get('email', '')
users = models.User.objects.filter(email=email, channel__slug=self.context["request"].channel)
if users.count():
errors['email'] = "An user with this email is already registered."
if errors:
raise serializers.ValidationError(errors)
return super(UserCreateSerializer, self).validate(data)
user.py 文件源码
项目:django-open-volunteering-platform
作者: OpenVolunteeringPlatform
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
def validate(self, data):
errors = dict()
if data.get('password') or data.get('current_password'):
current_password = data.pop('current_password', '')
password = data.get('password', '')
try:
validate_password(password=password)
except ValidationError as e:
errors['password'] = list(e.messages)
if not authenticate(email=self.context['request'].user.email, password=current_password, channel=self.context["request"].channel):
errors['current_password'] = ["Invalid password."]
if errors:
raise serializers.ValidationError(errors)
return super(UserCreateSerializer, self).validate(data)
def startremotechannelimport(self, request):
try:
channel_id = request.data["channel_id"]
except KeyError:
raise serializers.ValidationError("The channel_id field is required.")
baseurl = request.data.get("baseurl", settings.CENTRAL_CONTENT_DOWNLOAD_BASE_URL)
job_metadata = {
"type": "REMOTECHANNELIMPORT",
"started_by": request.user.pk,
}
job_id = get_client().schedule(
call_command,
"importchannel",
"network",
channel_id,
baseurl=baseurl,
extra_metadata=job_metadata,
)
resp = _job_to_response(get_client().status(job_id))
return Response(resp)
def validate(self, data):
"""
Check that start_end/end_times are valid
"""
if data['start_time'] >= data['end_time']:
raise serializers.ValidationError('End time must occur after start')
create_model = self.context['view'].model
user = self.context['request'].user
start_time = data['start_time']
end_time = data['end_time']
if create_model.objects.filter(user=user).filter(end_time__gte=start_time, start_time__lte=end_time).exists():
raise ValidationError('Overlapping Periods Found')
return data
def create(self, validated_data):
"""
This method is overwritten in order to create User object and associate it with reseller.
This operation is needed to create token for reseller
"""
application_id = self.initial_data['application'].id
reseller_name = validated_data['name']
username = '{application_id}.{reseller_name}'.format(application_id=application_id,
reseller_name=reseller_name)
if get_user_model().objects.filter(username=username).exists():
raise ValidationError('Reseller with such name is already created')
user = get_user_model().objects.create(username=username)
return Reseller.objects.create(owner=user, application=self.initial_data['application'],
**validated_data)
def validate(self, data):
"""
Check that the user is a member of the club.
"""
club = data['club']
user = data['user']
project = data['project']
if not project.has_club_member(user):
raise serializers.ValidationError(
"The specified user must be a member of at least one of the "
+ "parent clubs!"
)
if not club.has_member(user):
raise serializers.ValidationError(
"The specified user must be a member of the specified club!"
)
return super(ProjectMembershipSerializer, self).validate(data)
def filter_queryset(self, queryset):
site_id = self.kwargs.get('site_id', None)
if site_id is None:
# If no username is specified, the request must be authenticated
if self.request.user.is_anonymous():
# raises a permission denied exception, forces authentication
self.permission_denied(self.request)
else:
try:
int(site_id)
except:
raise serializers.ValidationError({'site': "Site Id Not Given."})
else:
return super(SiteFormViewSet, self).filter_queryset(queryset)
return super(SiteFormViewSet, self).filter_queryset(queryset)
site_id = int(site_id)
queryset = queryset.filter(site__id=site_id)
return queryset
def validate(self, attrs):
username = attrs.get('username')
password = attrs.get('password')
if username and password:
user = authenticate(username=username, password=password)
if user:
if not user.is_active:
msg = _('User account is disabled.')
raise serializers.ValidationError(msg)
else:
msg = _('Unable to log in with provided credentials.')
raise serializers.ValidationError(msg)
else:
msg = _('Must include "username" and "password".')
raise serializers.ValidationError(msg)
attrs['user'] = user
return attrs
def run_validation(self, data=[]):
"""
We override the default `run_validation`, because the validation
performed by validators and the `.validate()` method should
be coerced into an error dictionary with a 'non_fields_error' key.
"""
(is_empty_value, data) = self.validate_empty_values(data)
if is_empty_value:
return data
try:
value = self.to_internal_value(data)
self.run_validators(value)
value = self.validate(value)
assert value is not None, '.validate() should return the validated data' # noqa
except (ValidationError, DjangoValidationError) as exc:
# TODO: Must be 'recipient' instead of 'to' in v2
raise ValidationError(
detail={
'to': data['to'],
'errors': get_validation_error_detail(exc)})
return value
def validate(self, attrs):
"""
http://www.django-rest-framework.org/topics/3.0-announcement/#differences-between-modelserializer-validation-and-modelform
"""
request = self.context.get('request', None)
# Re-use model validation logic
instance = Message(author=request.user, **attrs)
try:
instance.clean()
if 'html' in attrs:
instance.validate_html()
except DjangoValidationError as err:
message = {}
for field, errors in err.message_dict.items():
if not isinstance(errors, list):
errors = [errors]
message[field] = errors
raise ValidationError(message)
return attrs
def validate(self, attrs):
# this is defined as a pre_save because cant be checked in clean() as
# there is not yet instance.message at this step.
same_named = attrs['message'].attachments.filter(
file__endswith='/{}'.format(attrs['file'].name))
if same_named.exists():
raise ValidationError((
'A file with the same name already exists '
'for this message : {}').format(same_named.first()))
# Now check whether we would fit within the total allowed
# attachment size for the message
total_size = 0
for attachment in attrs['message'].attachments.all():
total_size += attachment.size
total_size += attrs['file'].size
if total_size > settings.CAMPAIGNS['ATTACHMENT_TOTAL_MAX_SIZE']:
raise ValidationError(_(
'Can not accept this file. Total attachment size for this '
'message ({} bytes) would exceed the maximum allowed size of '
'{} bytes'.format(
total_size, settings.CAMPAIGNS[
'ATTACHMENT_TOTAL_MAX_SIZE'])))
return attrs