def post(self, request, *args, **kwargs):
"""Login a user given a username password combination
Args:
request (rest_framework.request.Request)
"""
email = request.data.get('email', None)
password = request.data.get('password', None)
if not all([email, password]):
raise serializers.ValidationError({'error': 'email and/or password not provided'})
user = authenticate(email=email, password=password)
if user is not None:
login(request, user)
return Response(PFBUserSerializer(user).data)
else:
return Response({
'detail': 'Unable to login with provided username/password'
}, status=status.HTTP_401_UNAUTHORIZED)
python类ValidationError()的实例源码
def set_password(self, request, pk=None):
"""Detail ``POST`` endpoint for changing a user's password
Args:
request (rest_framework.request.Request)
pk (str): primary key for user to retrieve user from database
Returns:
Response
"""
old_password = request.data.get('oldPassword')
user = authenticate(email=PFBUser.objects.get(uuid=pk).email,
password=old_password)
if not user:
raise ValidationError({'detail': 'Unable to complete password change'})
new_password = request.data.get('newPassword')
if not new_password:
raise ValidationError({'detail': 'Unable to complete password change'})
user.set_password(new_password)
user.save()
return Response({'detail': 'Successfully changed password'},
status=status.HTTP_200_OK)
def test_financial_aid_model_unique(self):
"""
Tests that FinancialAid objects are unique per User and Program
"""
financial_aid = FinancialAidFactory.create()
# Test creation of FinancialAid that isn't unique_together with "user" and "tier_program__program"
# financial aid with same user and different program (new program created by the factory)
FinancialAidFactory.create(user=financial_aid.user)
# financial aid with same program and different user (new user created by the factory)
FinancialAidFactory.create(tier_program=financial_aid.tier_program)
# Test updating the original FinancialAid doesn't raise ValidationError
financial_aid.income_usd = 100
financial_aid.save()
# Test creation should fail for FinancialAid already existing with the same "user" and "tier_program__program"
with self.assertRaises(ValidationError):
FinancialAidFactory.create(
user=financial_aid.user,
tier_program=financial_aid.tier_program
)
def test_financial_aid_model_duplicate_if_reset(self):
"""
Tests that FinancialAid objects can not be unique per User and Program
if the other are in reset status
"""
financial_aid = FinancialAidFactory.create()
# change the first one to any state that is not `reset` will fail to create a new financial aid
for status in FinancialAidStatus.ALL_STATUSES:
if status == FinancialAidStatus.RESET:
continue
financial_aid.status = status
financial_aid.save()
with self.assertRaises(ValidationError):
FinancialAidFactory.create(
user=financial_aid.user,
tier_program=financial_aid.tier_program
)
# reset status will allow a new financial aid
financial_aid.status = FinancialAidStatus.RESET
financial_aid.save()
FinancialAidFactory.create(
user=financial_aid.user,
tier_program=financial_aid.tier_program
)
def test_update_education_different_profile(self):
"""
Make sure we can't edit an education for a different profile
"""
with mute_signals(post_save):
education1 = EducationFactory.create()
education2 = EducationFactory.create()
education_object = EducationSerializer(education1).data
education_object['id'] = education2.id
serializer = ProfileSerializer(instance=education1.profile, data={
'education': [education_object], 'work_history': []
})
serializer.is_valid(raise_exception=True)
with self.assertRaises(ValidationError) as ex:
serializer.save()
assert ex.exception.detail == ["Education {} does not exist".format(education2.id)]
def update_education(education_list, profile_id):
"""
Update education for given profile id.
Args:
education_list (list): List of education dicts.
profile_id (int): User profile id.
"""
saved_education_ids = set()
for education in education_list:
education_id = education.get("id")
if education_id is not None:
try:
education_instance = Education.objects.get(profile_id=profile_id, id=education_id)
except Education.DoesNotExist:
raise ValidationError("Education {} does not exist".format(education_id))
else:
education_instance = None
education_serializer = EducationSerializer(instance=education_instance, data=education)
education_serializer.is_valid(raise_exception=True)
education_serializer.save(profile_id=profile_id)
saved_education_ids.add(education_serializer.instance.id)
Education.objects.filter(profile_id=profile_id).exclude(id__in=saved_education_ids).delete()
def validate(self, attrs):
"""
Assert that filled_out can't be turned off and that agreed_to_terms_of_service is true
"""
if 'filled_out' in attrs and not attrs['filled_out']:
raise ValidationError("filled_out cannot be set to false")
if 'agreed_to_terms_of_service' in attrs and not attrs['agreed_to_terms_of_service']:
raise ValidationError("agreed_to_terms_of_service cannot be set to false")
# Postal code is only required in United States and Canada
country = attrs.get("country", "")
postal_code = attrs.get("postal_code", "")
if country in ("US", "CA") and not postal_code:
raise ValidationError("postal_code may not be blank")
return super(ProfileFilledOutSerializer, self).validate(attrs)
def test_no_current_financial_aid(self):
"""
Purchasable course runs must have financial aid available
"""
course_run, user = create_purchasable_course_run()
program = course_run.course.program
tier_program = program.tier_programs.first()
tier_program.current = False
tier_program.save()
with self.assertRaises(ValidationError) as ex:
get_purchasable_course_run(course_run.edx_course_key, user)
assert ex.exception.args[0] == (
"Course run {} does not have a current attached financial aid application".format(
course_run.edx_course_key
)
)
def test_financial_aid_for_user(self):
"""
Purchasable course runs must have a financial aid attached for the given user
"""
course_run, user = create_purchasable_course_run()
program = course_run.course.program
tier_program = program.tier_programs.first()
financial_aid = tier_program.financialaid_set.first()
financial_aid.user = UserFactory.create()
financial_aid.save()
with self.assertRaises(ValidationError) as ex:
get_purchasable_course_run(course_run.edx_course_key, user)
assert ex.exception.args[0] == (
"Course run {} does not have a current attached financial aid application".format(
course_run.edx_course_key
)
)
def test_financial_aid_terminal_status(self):
"""
FinancialAid must have a status which allows purchase to happen
"""
course_run, user = create_purchasable_course_run()
program = course_run.course.program
tier_program = program.tier_programs.first()
financial_aid = tier_program.financialaid_set.first()
for status in set(FinancialAidStatus.ALL_STATUSES).difference(set(FinancialAidStatus.TERMINAL_STATUSES)):
financial_aid.status = status
financial_aid.save()
with self.assertRaises(ValidationError) as ex:
get_purchasable_course_run(course_run.edx_course_key, user)
assert ex.exception.args[0] == (
"Course run {} does not have a current attached financial aid application".format(
course_run.edx_course_key
)
)
def test_already_purchased(self, has_to_pay):
"""
Purchasable course runs must not be already purchased
"""
course_run, user = create_purchasable_course_run()
order = create_unfulfilled_order(course_run.edx_course_key, user)
# succeeds because order is unfulfilled
assert course_run == get_purchasable_course_run(course_run.edx_course_key, user)
order.status = Order.FULFILLED
order.save()
with self.assertRaises(ValidationError) as ex:
get_purchasable_course_run(course_run.edx_course_key, user)
assert ex.exception.args[0] == 'Course run {} is already purchased'.format(course_run.edx_course_key)
assert has_to_pay.call_count == 1
def validate(self, attrs):
# custom validation
host = self._get_host_by_fqdn(attrs)
if host is None:
raise ValidationError('Host \'{}\' not found.'.format(attrs['certname']))
if not host.pxe_installable:
raise ValidationError('Host \'{}\' is marked as not installable via PXE.'.format(attrs['certname']))
if not host.pxe_key:
raise ValidationError('Host \'{}\' has no pxe_key.'.format(attrs['certname']))
if host.pxe_key != attrs['pxe_key']:
raise ValidationError('Supplied pxe_key \'{}\' does not match host \'{}\'.'.format(
attrs['pxe_key'],
attrs['certname']))
self.host = host
return attrs
def validate(self, attrs):
attrs = super(BatchRequestSerializer, self).validate(attrs)
files_in_use = []
for batch in attrs['batch']:
if 'attached_files' not in batch:
continue
attached_files = batch['attached_files']
if isinstance(attached_files, dict):
files_in_use.extend(attached_files.values())
elif isinstance(attached_files, list):
files_in_use.extend(attached_files)
else:
raise ValidationError({'attached_files': 'Invalid format.'})
missing_files = set(files_in_use) - set(self.get_files().keys())
if missing_files:
raise ValidationError('Some of files are not provided: {}'.format(', '.join(missing_files)))
return attrs
def _process_attr(self, attr):
params = re.findall(
r'({result=(?P<name>\w+):\$\.(?P<value>[a-zA-Z0-9.*]+)})', attr
)
if not params:
return attr
for url_param in params:
if url_param[1] not in self.named_responses:
raise ValidationError('Named request {} is missing'.format(url_param[1]))
result = get_attribute(
self.named_responses[url_param[1]]['_data'],
url_param[2].split('.')
)
if isinstance(result, list):
result = ','.join(map(six.text_type, result))
if attr == url_param[0]:
attr = result
else:
attr = attr.replace(url_param[0], str(result))
return attr
def __iter__(self):
for request_data in self.request_serializer.data['batch']:
request_data['data'] = self.updated_obj(request_data['data'])
request_data['relative_url'] = self._process_attr(request_data['relative_url'])
if self.request.content_type.startswith('multipart/form-data'):
request_data['_body'] = self._prepare_formdata_body(request_data['data'], files=request_data.get('files', {}))
elif self.request.content_type.startswith('application/x-www-form-urlencoded'):
request_data['_body'] = self._prepare_urlencoded_body(request_data['data'])
elif self.request.content_type.startswith('application/json'):
request_data['_body'] = self._prepare_json_body(request_data['data'])
else:
raise ValidationError('Unsupported content type')
yield BatchRequest(self.request, request_data)
def run_validators(self, value):
"""
Test the given value against all the validators on the field,
and either raise a `ValidationError` or simply return.
"""
errors = []
for validator in self.validators:
if hasattr(validator, 'set_context'):
validator.set_context(self)
try:
validator(value)
except ValidationError as exc:
# If the validation error contains a mapping of fields to
# errors then simply raise it immediately rather than
# attempting to accumulate a list of errors.
if isinstance(exc.detail, dict):
raise
errors.extend(exc.detail)
except DjangoValidationError as exc:
errors.extend(get_error_detail(exc))
if errors:
raise ValidationError(errors)
def as_serializer_error(exc):
assert isinstance(exc, (ValidationError, DjangoValidationError))
if isinstance(exc, DjangoValidationError):
detail = get_error_detail(exc)
else:
detail = exc.detail
if isinstance(detail, Mapping):
# If errors may be a dict we use the standard {key: list of values}.
# Here we ensure that all the values are *lists* of errors.
return {
key: value if isinstance(value, (list, Mapping)) else [value]
for key, value in detail.items()
}
elif isinstance(detail, list):
# Errors raised as a list are non-field errors.
return {
api_settings.NON_FIELD_ERRORS_KEY: detail
}
# Errors raised as a string are non-field errors.
return {
api_settings.NON_FIELD_ERRORS_KEY: [detail]
}
def run_validation(self, data=empty):
"""
We override the default `run_validation`, because the validation
performed by validators and the `.validate()` method should
be coerced into an error dictionary with a 'non_fields_error' key.
"""
(is_empty_value, data) = self.validate_empty_values(data)
if is_empty_value:
return data
value = self.to_internal_value(data)
try:
self.run_validators(value)
value = self.validate(value)
assert value is not None, '.validate() should return the validated data'
except (ValidationError, DjangoValidationError) as exc:
raise ValidationError(detail=as_serializer_error(exc))
return value
def is_valid(self, raise_exception=False):
# This implementation is the same as the default,
# except that we use lists, rather than dicts, as the empty case.
assert hasattr(self, 'initial_data'), (
'Cannot call `.is_valid()` as no `data=` keyword argument was '
'passed when instantiating the serializer instance.'
)
if not hasattr(self, '_validated_data'):
try:
self._validated_data = self.run_validation(self.initial_data)
except ValidationError as exc:
self._validated_data = []
self._errors = exc.detail
else:
self._errors = []
if self._errors and raise_exception:
raise ValidationError(self.errors)
return not bool(self._errors)
def validate(self, validated_data):
ingredient_uuid = validated_data['ingredient']['uuid']
ingredient = Ingredient.objects.get(uuid=ingredient_uuid)
validated_data['ingredient'] = ingredient
if 'measurement' in validated_data:
measurement_details = validated_data.pop('measurement')
# measurement_details = validated_data['measurement']
measurement_uuid = measurement_details['uuid']
try:
measurement = Measurement.objects.get(uuid=measurement_uuid)
except Vendor.DoesNotExist:
raise ValidationError('Non-required Measurement UUID doesn\'t exist'.format(measurement_uuid))
validated_data['measurement'] = measurement
return validated_data
def validate(self, validated_data):
if 'ingredient_compositions' in validated_data:
ingredient_compositions = validated_data.pop('ingredient_compositions')
ingredient_compositions_uuids = [item['uuid'] for item in ingredient_compositions]
ingredient_compositions = IngredientComposition.objects.filter(uuid__in=ingredient_compositions_uuids)
if ingredient_compositions.count() != len(ingredient_compositions_uuids):
raise ValidationError('Not all ingredient composition UUIDs were found {}'.format(
ingredient_compositions_uuids))
else:
ingredient_compositions = []
validated_data['ingredient_compositions'] = ingredient_compositions
return validated_data
def create(self, validated_data):
"""
This method is overwritten in order to create User object and associate it with reseller.
This operation is needed to create token for reseller
"""
application_id = self.initial_data['application'].id
reseller_name = validated_data['name']
username = '{application_id}.{reseller_name}'.format(application_id=application_id,
reseller_name=reseller_name)
if get_user_model().objects.filter(username=username).exists():
raise ValidationError('Reseller with such name is already created')
user = get_user_model().objects.create(username=username)
return Reseller.objects.create(owner=user, application=self.initial_data['application'],
**validated_data)
def update(self, request, *args, **kwargs):
"""
Override update to make sure that only valid clubRole is assigned after
updation.
"""
club_membership = self.get_object()
serializer = self.get_serializer(club_membership,
data=request.data,
partial=True)
serializer.is_valid(raise_exception=True)
if club_membership.user != serializer.validated_data['user']:
raise rest_exceptions.ValidationError(
'You can not update the User!'
)
if not club_membership.club_role.club \
.has_role(serializer.validated_data['club_role']):
raise rest_exceptions.ValidationError(
'Invalid Club Role ID for this Club!')
return super(ClubMembershipViewSet, self).update(
request, *args, **kwargs)
def add_club(self, request, pk=None):
"""
Collaborate with another Club in this project.
"""
project = self.get_object()
club_id = int(request.query_params.get('club', -1))
if club_id == -1:
raise rest_exceptions.ValidationError(
'The request must contain a club!'
)
try:
club = models.Club.objects.get(id=club_id)
except models.Club.DoesNotExist:
raise rest_exceptions.ValidationError(
'The club does not exist!'
)
project.add_club(club)
serializer = serializers.ProjectSerializer(project)
return Response(serializer.data)
def validate(self, attrs):
su = super().validate(attrs)
try:
longitude = su.get("longitude", self.instance.longitude)
except AttributeError:
longitude = None
try:
latitude = su.get("latitude", self.instance.latitude)
except AttributeError:
latitude = None
if latitude is not None and (latitude > 90 or latitude < -90):
raise ValidationError("Latitude must be between -90 and 90")
if longitude is not None and (longitude > 180 or longitude < -180):
raise ValidationError("Longitude must be between -180 and 180")
if (longitude is None and latitude is not None) or (longitude is not None and latitude is None):
raise ValidationError(
"If longitude is provided then latitude is required and vice versa. Both can be null.")
return su
def run_validators(self, value):
"""
Test the given value against all the validators on the field,
and either raise a `ValidationError` or simply return.
"""
errors = []
for validator in self.validators:
if hasattr(validator, 'set_context'):
validator.set_context(self)
try:
validator(value)
except ValidationError as exc:
# If the validation error contains a mapping of fields to
# errors then simply raise it immediately rather than
# attempting to accumulate a list of errors.
if isinstance(exc.detail, dict):
raise
errors.extend(exc.detail)
except DjangoValidationError as exc:
errors.extend(exc.messages)
if errors:
raise ValidationError(errors)
def leave_close_claim(self, request, *args, **kwargs):
"""
:param request:
{
"target_account_id": account_id
}
"""
deposit = self.get_object()
if deposit.status in deposit.INOPERABLE_STATUSES:
raise ValidationError('???????? ? ????????? ??????????')
if deposit.status == deposit.STATUS_REQUESTED_CLOSING:
raise ValidationError('?????? ?? ???????? ???????? ??? ??????.')
target_account_id = request.data['target_account_id']
if not fin_models.Account.objects.filter(pk=target_account_id):
raise ValidationError('?????????? ????? ?? ??????????.')
account = fin_models.Account.objects.get(pk=target_account_id)
if account.status in fin_models.Account.INOPERABLE_STATUSES:
raise ValidationError('???????? ? ????????? ?????? ??????????.')
res, info = deposit.leave_close_claim(target_account_id)
return Response(info, status=status.HTTP_200_OK if res else status.HTTP_400_BAD_REQUEST)
def confirm(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
try:
key, pk = data['key'].rsplit(':', 1)
user = self.get_queryset().get(pk=pk)
assert md5(user.username).hexdigest() == key
except (TypeError, AssertionError, ValueError, get_user_model().DoesNotExist) as e:
raise ValidationError('???????? ????')
if data['new_password'] != data['new_password_confirm']:
raise ValidationError('????? ?????? ?? ?????????')
user.set_password(data['new_password'])
user.save()
return Response(status=status.HTTP_204_NO_CONTENT)
def change_password(self, request, *args, **kwargs):
serializer = ChangePasswordSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
user = self.get_object()
if not user.is_active:
raise ValidationError('???????????? ?????????????')
if not user.check_password(serializer.validated_data['old_password']):
raise ValidationError('???????? ??????')
if data['new_password'] != data['new_password_confirm']:
raise ValidationError('????? ?????? ?? ?????????')
user.set_password(data['new_password'])
user.save()
return Response(status=status.HTTP_204_NO_CONTENT)
def post(self, request, *args, **kwargs):
user = authenticate(userid=request.data['username'], password=request.data['password'])
if user:
is_active = user.is_active
if is_active:
token, _ = Token.objects.get_or_create(user=user)
response = Response({"token": token.key,
"user_pk": token.user_id,
"created": token.created}, status=status.HTTP_200_OK)
return response
else:
detail = "?? ??? ??????."
raise PermissionDenied(detail=detail)
else:
detail = "???? ?? ? ????. username? password? ?? ??????."
raise ValidationError(detail=detail)