def many_init(cls, *args, **kwargs):
"""
This method implements the creation of a `ListSerializer` parent
class when `many=True` is used. You can customize it if you need to
control which keyword arguments are passed to the parent, and
which are passed to the child.
Note that we're over-cautious in passing most arguments to both parent
and child classes in order to try to cover the general case. If you're
overriding this method you'll probably want something much simpler, eg:
@classmethod
def many_init(cls, *args, **kwargs):
kwargs['child'] = cls()
return CustomListSerializer(*args, **kwargs)
"""
kwargs['child'] = BasePageSerializer(*args, **kwargs)
return ListSerializer(*args, **kwargs)
python类ListSerializer()的实例源码
def to_internal_value(self, data):
"""
This implements the same relevant logic as ListSerializer except that if one or more items fail validation,
processing for other items that did not fail will continue.
"""
if not isinstance(data, list):
message = self.error_messages['not_a_list'].format(
input_type=type(data).__name__
)
raise serializers.ValidationError({
api_settings.NON_FIELD_ERRORS_KEY: [message]
})
ret = []
for item in data:
try:
validated = self.child.run_validation(item)
except serializers.ValidationError as exc:
ret.append(exc.detail)
else:
ret.append(validated)
return ret
rest_framework_dyn_serializer.py 文件源码
项目:django-rest-framework-dyn-serializer
作者: Nepherhotep
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def exclude_omitted_fields(self, request):
field_names = self.get_requested_field_names(request)
self._requested_fields = field_names
if field_names is not None:
# Drop any fields that are not specified in passed query param
allowed = set(field_names)
existing = set(self.fields.keys())
for field_name in existing - allowed:
self.fields.pop(field_name)
for field_name in self.fields:
field = self.fields[field_name]
if isinstance(field, serializers.ListSerializer):
if isinstance(field.child, DynModelSerializer):
field.child.exclude_omitted_fields(request)
elif isinstance(field, DynModelSerializer):
field.exclude_omitted_fields(request)
def data(self):
# The parent class returns ReturnList
return ReturnDict(
{
LINKS_FIELD_NAME: {
URL_FIELD_NAME: {
'href': self.context['request'].build_absolute_uri()
}
},
EMBEDDED_FIELD_NAME: {
# `items` mirrors hardcoded value in pagination classes
'items': super(ListSerializer, self).data
}
},
serializer=self
)
def data(self):
# The parent class returns ReturnList
return ReturnDict(
{
LINKS_FIELD_NAME: {
URL_FIELD_NAME: {
'href': self.context['request'].build_absolute_uri()
}
},
EMBEDDED_FIELD_NAME: {
# `items` mirrors hardcoded value in pagination classes
'items': super(ListSerializer, self).data
}
},
serializer=self
)
def react_user_list(users, project, identifier):
users = users.order_by(Lower('username'))
user_list = ListSerializer(users, child=UserWithMailSerializer()).data
format_strings = {
'users': json.dumps(user_list),
'project': project.pk,
'listen_to': identifier,
}
return format_html(
(
'<span data-euth-widget="userlist"'
' data-users="{users}"'
' data-project="{project}"'
' data-listen-to="{listen_to}"></span>'
),
**format_strings
)
def assert_required_fields(self, field_names, parent_getter, field_parent_getter):
"""
Helper function to assert required fields
"""
for key in field_names:
field = field_parent_getter(ProfileFilledOutSerializer().fields)[key]
is_generated = isinstance(field, (ListSerializer, SerializerMethodField, ReadOnlyField))
is_skippable = (field.read_only or field.allow_null or field.allow_blank)
# skip fields that are skippable, generated, read only, or that tie
# to other serializers which are tested elsewhere.
if is_generated or is_skippable:
continue
clone = deepcopy(self.data)
clone["image"] = self.profile.image
clone["image_small"] = self.profile.image_small
clone["image_medium"] = self.profile.image_medium
parent_getter(clone)[key] = None
with self.assertRaises(ValidationError) as ex:
ProfileFilledOutSerializer(data=clone).is_valid(raise_exception=True)
assert parent_getter(ex.exception.args[0]) == {key: ['This field may not be null.']}
if isinstance(field, CharField):
# test blank string too
parent_getter(clone)[key] = ""
with self.assertRaises(ValidationError) as ex:
ProfileFilledOutSerializer(data=clone).is_valid(raise_exception=True)
assert parent_getter(ex.exception.args[0]) == {key: ['This field may not be blank.']}
def test_patch_own_profile(self):
"""
A user PATCHes their own profile
"""
with mute_signals(post_save):
ProfileFactory.create(user=self.user1, filled_out=False, agreed_to_terms_of_service=False)
self.client.force_login(self.user1)
with mute_signals(post_save):
new_profile = ProfileFactory.create(filled_out=False)
new_profile.user.social_auth.create(
provider=EdxOrgOAuth2.name,
uid="{}_edx".format(new_profile.user.username)
)
patch_data = ProfileSerializer(new_profile).data
del patch_data['image']
resp = self.client.patch(self.url1, content_type="application/json", data=json.dumps(patch_data))
assert resp.status_code == 200
old_profile = Profile.objects.get(user__username=self.user1.username)
for key, value in patch_data.items():
field = ProfileSerializer().fields[key]
if isinstance(field, (ListSerializer, SerializerMethodField, ReadOnlyField)) or field.read_only is True:
# these fields are readonly
continue
elif isinstance(field, DateField):
assert getattr(old_profile, key) == parse(value).date()
else:
assert getattr(old_profile, key) == value
def get_serializer_fields(self, path, method, view):
"""
Return a list of `coreapi.Field` instances corresponding to any
request body input, as determined by the serializer class.
"""
if method not in ('PUT', 'PATCH', 'POST'):
return []
if not hasattr(view, 'get_serializer'):
return []
serializer = view.get_serializer()
if isinstance(serializer, serializers.ListSerializer):
return [
coreapi.Field(
name='data',
location='body',
required=True,
schema=coreschema.Array()
)
]
if not isinstance(serializer, serializers.Serializer):
return []
fields = []
for field in serializer.fields.values():
if field.read_only or isinstance(field, serializers.HiddenField):
continue
required = field.required and method != 'PATCH'
field = coreapi.Field(
name=field.field_name,
location='form',
required=required,
schema=field_to_schema(field)
)
fields.append(field)
return fields
def __init__(self, field, parent, only_fields, include_fields):
self.field = field
self.parent = parent
self.is_many = isinstance(field, serializers.ListSerializer) and isinstance(field.child, serializers.Serializer)
self.has_context = isinstance(field, serializers.Serializer) or self.is_many
if self.has_context:
self.old_context = None
self.only_fields = self.filter_fields(field.field_name, only_fields)
self.include_fields = self.filter_fields(field.field_name, include_fields)
self.on_exit_delete_fields = False
self.on_exit_delete_include_fields = False
self.old_fields = None
self.old_include_fields = None
def get(self, request):
"""
Returns all the suggestions.
"""
serializer = rf_serializers.ListSerializer(models.Suggestion.objects.all(), child=serializers.SimpleSuggestionSerializer())
return Response(serializer.data)
def list(self, request):
"""
Returns all the beer styles. Do not show suggested
beer styles by default.
"""
styles = models.Style.objects.all()
if request.auth is None:
styles = styles[:settings.UNAUTHENTICATED_RESULTS_COUNT]
serializer = rf_serializers.ListSerializer(
styles,
child=serializers.SimpleStyleSerializer()
)
return Response(serializer.data)
def list(self, request):
"""
Returns all approved yeast types by default.
"""
serializer = rf_serializers.ListSerializer(
models.YeastType.objects.all() if request.auth is not None else models.YeastType.objects.all()[:settings.UNAUTHENTICATED_RESULTS_COUNT],
child=serializers.YeastTypeSerializer()
)
return Response(serializer.data)
def list(self, request):
"""
Returns all approved yeast strains.
"""
serializer = rf_serializers.ListSerializer(
models.Yeast.objects.all() if request.auth is not None else models.Yeast.objects.all()[:settings.UNAUTHENTICATED_RESULTS_COUNT],
child=serializers.SimpleYeastSerializer()
)
return Response(serializer.data)
def list(self, request):
"""
Returns all countries in the system.
"""
serializer = rf_serializers.ListSerializer(
models.CountryCode.objects.all() if request.auth is not None else models.CountryCode.objects.all()[:settings.UNAUTHENTICATED_RESULTS_COUNT],
child=serializers.CountryCodeSerializer()
)
return Response(serializer.data)
def list(self, request):
"""
Returns all hops that are approved in the system by default.
"""
serializer = rf_serializers.ListSerializer(
models.Hop.objects.all() if request.auth is not None else models.Hop.objects.all()[:settings.UNAUTHENTICATED_RESULTS_COUNT],
child=serializers.SimpleHopSerializer()
)
return Response(serializer.data)
def list(self, request):
"""
Returns all fermentable types in the system
"""
fermentable_types = fermentable_types = models.FermentableType.objects.filter(is_active=True)
# if the user is authenticated return all valid fermentable types
if request.auth is None:
fermentable_types = fermentable_types[:settings.UNAUTHENTICATED_RESULTS_COUNT]
serializer = rf_serializers.ListSerializer(
fermentable_types,
child=serializers.FermentableType()
)
return Response(serializer.data)
def list(self, request):
"""
Returns all fermentables that are approved in the system by default.
"""
fermentables = models.Fermentable.objects.filter(is_active=True)
if request.auth is None:
fermentables = fermentables[:settings.UNAUTHENTICATED_RESULTS_COUNT]
serializer = rf_serializers.ListSerializer(
fermentables,
child=serializers.Fermentable()
)
return Response(serializer.data)
def test_build_serializer_with_depth(self):
class VehicleSerializer(ModelSerializer):
class Meta:
model = Vehicle
session = session
fields = '__all__'
depth = 3
serializer = VehicleSerializer()
self.assertEqual(len(serializer.fields), 11)
self.assertEqual(
set(serializer.fields.keys()), {
Vehicle.created_at.key, Vehicle.engine.key, Vehicle.id.key, Vehicle.name.key, Vehicle.options.key,
Vehicle.other.key, Vehicle.owner.key, Vehicle.paint.key, Vehicle.type.key, Vehicle.is_used.key, 'url'
}
)
engine_serializer = serializer.fields['engine']
self.assertEqual(len(engine_serializer.fields), 4)
self.assertEqual(set(engine_serializer.fields.keys()), {'type_', 'displacement', 'fuel_type', 'cylinders'})
owner_serializer = serializer.fields['owner']
self.assertEqual(len(owner_serializer.fields), 3)
self.assertEqual(set(owner_serializer.fields.keys()), {'id', 'first_name', 'last_name'})
self.assertEqual(set(f.label for f in owner_serializer.fields.values()), {'Id', 'First name', 'Last name'})
options_serializer = serializer.fields['options']
self.assertTrue(options_serializer.many)
self.assertIsInstance(options_serializer, ListSerializer)
option_serializer = options_serializer.child
self.assertEqual(len(option_serializer.fields), 2)
self.assertEqual(set(option_serializer.fields.keys()), {'id', 'name'})
def perform_update(self, instance, validated_data, errors):
"""
The main nested update logic implementation using nested fields and serializer
"""
for field in self._writable_fields:
if field.field_name not in validated_data:
continue
try:
value = validated_data.get(field.field_name)
if isinstance(field, BaseSerializer):
child_instance = getattr(instance, field.field_name, None)
child_instance = field.get_object(value, child_instance)
if child_instance and field.allow_nested_updates:
value = field.perform_update(child_instance, value, errors)
else:
value = child_instance
elif isinstance(field, ListSerializer) and isinstance(field.child, BaseSerializer):
value = []
for item in validated_data.get(field.field_name):
child_instance = field.child.get_object(item)
if child_instance and (field.child.allow_create or field.child.allow_nested_updates):
v = field.child.perform_update(child_instance, item, errors)
else:
v = child_instance
if v:
value.append(v)
self.update_attribute(instance, field, value)
except Exception as e:
errors.setdefault(field.field_name, []).append(' '.join(e.args))
return instance
def many_init(cls, *args, **kwargs):
kwargs['child'] = PageSerializer(*args, **kwargs)
return ListSerializer(*args, **kwargs)
# TODO: decide if we need this
def __init__(self, serializer):
"""
:param serializer: DjangoRestFramework Serializer
"""
if isinstance(serializer, ListSerializer):
serializer = serializer.child
self.serializer = serializer
self.name = self._get_name()
self.fields = self._collect_fields()
def get_raw_type_from_serializer_field(field):
if isinstance(field, ListSerializer):
return 'list'
if isinstance(field, ModelSerializer):
return 'model'
# return smart_text(field.Meta.model.__name__).lower()
name = field.__class__.__name__
if name.endswith('Field'):
name = smart_text(name[:-5])
return smart_text(name).lower()
def set_field_property(self, segments, fields, property_key):
if len(segments) == 1:
field = fields.get(segments[0])
if field:
setattr(field, property_key, True)
return
serializer = fields.get(segments[0])
fields = serializer.child.fields\
if type(serializer) is serializers.ListSerializer else\
serializer.fields
return self.set_field_property(
segments[1:], fields, property_key)
def field_to_schema(field):
title = force_text(field.label) if field.label else ''
description = force_text(field.help_text) if field.help_text else ''
if isinstance(field, serializers.ListSerializer):
child_schema = field_to_schema(field.child)
return coreschema.Array(
items=child_schema,
title=title,
description=description
)
elif isinstance(field, serializers.Serializer):
return coreschema.Object(
properties=OrderedDict([
(key, field_to_schema(value))
for key, value
in field.fields.items()
]),
title=title,
description=description
)
elif isinstance(field, serializers.ManyRelatedField):
return coreschema.Array(
items=coreschema.String(),
title=title,
description=description
)
elif isinstance(field, serializers.RelatedField):
return coreschema.String(title=title, description=description)
elif isinstance(field, serializers.MultipleChoiceField):
return coreschema.Array(
items=coreschema.Enum(enum=list(field.choices.keys())),
title=title,
description=description
)
elif isinstance(field, serializers.ChoiceField):
return coreschema.Enum(
enum=list(field.choices.keys()),
title=title,
description=description
)
elif isinstance(field, serializers.BooleanField):
return coreschema.Boolean(title=title, description=description)
elif isinstance(field, (serializers.DecimalField, serializers.FloatField)):
return coreschema.Number(title=title, description=description)
elif isinstance(field, serializers.IntegerField):
return coreschema.Integer(title=title, description=description)
if field.style.get('base_template') == 'textarea.html':
return coreschema.String(
title=title,
description=description,
format='textarea'
)
return coreschema.String(title=title, description=description)
def run_autooptimization_discovery(serializer, prefix, select_related_set, prefetch_related_set, is_prefetch,
only_fields, include_fields):
if not hasattr(serializer, "Meta") or not hasattr(serializer.Meta, "model"):
return
model_class = serializer.Meta.model
if hasattr(serializer, "get_on_demand_fields"):
on_demand_fields = serializer.get_on_demand_fields()
else:
on_demand_fields = set()
def filter_field_name(field_name, fields_to_serialize):
if fields_to_serialize is not None:
return ContextPassing.filter_fields(field_name, fields_to_serialize)
return None
for field_name, field in serializer.fields.items():
if hasattr(serializer, "check_if_needs_serialization"):
if not serializer.check_if_needs_serialization(field_name, only_fields, include_fields, on_demand_fields):
continue
if isinstance(field, ListSerializer):
if "." not in field.source and hasattr(model_class, field.source):
model_field = getattr(model_class, field.source)
if check_if_prefetch_object(model_field):
prefetch_related_set.add(prefix + field.source)
run_autooptimization_discovery(field.child, prefix + field.source + "__", select_related_set,
prefetch_related_set, True,
filter_field_name(field_name, only_fields),
filter_field_name(field_name, include_fields))
elif isinstance(field, Serializer):
if "." not in field.source and hasattr(model_class, field.source):
model_field = getattr(model_class, field.source)
if check_if_related_object(model_field):
if is_prefetch:
prefetch_related_set.add(prefix + field.source)
else:
select_related_set.add(prefix + field.source)
run_autooptimization_discovery(field, prefix + field.source + "__", select_related_set,
prefetch_related_set, is_prefetch,
filter_field_name(field_name, only_fields),
filter_field_name(field_name, include_fields))
elif "." in field.source:
field_name = field.source.split(".", 1)[0]
if hasattr(model_class, field_name):
model_field = getattr(model_class, field_name)
if check_if_related_object(model_field):
select_related_set.add(prefix + field_name)
def _extract_field_info(self, model, field_name, field, fields, relationships, adapter, target_app, allow_recursion=False):
if field_name == 'id':
return None
field_item = {
'name': field_name,
'type': adapter.field_type_mapping[field.__class__.__name__]
}
if isinstance(field, PrimaryKeyRelatedField) or isinstance(field, ManyRelatedField) \
or isinstance(field, ModelSerializer):
if model is None:
field_item['related_model'] = field.queryset.model._meta.model_name.lower()
field_item['app'] = target_app if target_app is not None else \
field.queryset.model._meta.app_label.lower()
relationships.append(field_item)
else:
model_field = model._meta.get_field(field_name)
field_item['related_model'] = model_field.related_model._meta.model_name.lower()
field_item['app'] = target_app if target_app is not None else \
model_field.related_model._meta.app_label.lower()
relationships.append(field_item)
if hasattr(model_field, 'field'):
field_item['inverse'] = model_field.field.name
elif hasattr(model_field, 'remote_field') and \
getattr(model_field.remote_field, 'related_name', None) is not None:
field_item['inverse'] = model_field.remote_field.related_name
if field_item.get('inverse', '-')[-1] == '+':
field_item.pop('inverse')
if isinstance(field, ModelSerializer):
if hasattr(field, 'many') and field.many:
field_item['type'] = adapter.field_type_mapping['ManyRelatedField']
else:
field_item['type'] = adapter.field_type_mapping['PrimaryKeyRelatedField']
elif isinstance(field, ModelSerializer):
field_item['related_model'] = field.queryset.model._meta.model_name.lower()
field_item['app'] = target_app if target_app is not None else \
field.queryset.model._meta.app_label.lower()
relationships.append(field_item)
if field.many:
field_item['type'] = adapter.field_type_mapping['ManyRelatedField']
else:
field_item['type'] = adapter.field_type_mapping['PrimaryKeyRelatedField']
elif isinstance(field, ListSerializer):
child_rels = []
child_fields = []
self._extract_field_info(model, field_name, field.child, child_fields, child_rels, adapter, target_app)
if len(child_rels) > 0:
for item in child_rels:
item['type'] = adapter.field_type_mapping['ManyRelatedField']
item.pop('inverse', None)
relationships.append(item)
else:
field_item['type'] = adapter.field_type_mapping('ListField')
fields.append(field_item)
else:
fields.append(field_item)
return field_item
def get_serializer_fields(self, path, method, view, version=None, method_func=None):
"""
Return a list of `coreapi.Field` instances corresponding to any
request body input, as determined by the serializer class.
"""
if method not in ('PUT', 'PATCH', 'POST'):
return []
serializer_class = self.get_serializer_class(view, method_func)
if not serializer_class:
return []
serializer = serializer_class()
if isinstance(serializer, serializers.ListSerializer):
return [
Field(
name='data',
location='body',
required=True,
schema=coreschema.Array()
)
]
if not isinstance(serializer, serializers.Serializer):
return []
fields = []
for field in serializer.fields.values():
if field.read_only or isinstance(field, serializers.HiddenField):
continue
required = field.required and method != 'PATCH'
# if the attribute ('help_text') of this field is a lazy translation object, force it to generate a string
description = str(field.help_text) if isinstance(field.help_text, Promise) else field.help_text
fallback_schema = self.fallback_schema_from_field(field)
field = Field(
name=field.field_name,
location='form',
required=required,
schema=fallback_schema if fallback_schema else field_to_schema(field),
description=description,
)
fields.append(field)
return fields
def update(self, instance, validated_data):
reverse_relations = OrderedDict()
relations = OrderedDict()
# Sort fields by create priority
fields = self.get_sorted_by_create_priority(self.fields)
# Remove related fields from validated data for future manipulations
for field_name, field in fields.items():
if field.read_only:
continue
if isinstance(field, serializers.ListSerializer):
if isinstance(field.child, serializers.ModelSerializer):
if validated_data.pop(field.source, None) is None:
# Skip field if field is not required or null allowed
continue
reverse_relations[field_name] = field.child
if isinstance(field, serializers.ModelSerializer):
if validated_data.pop(field.source, None) is None:
# Skip field if field is not required or null allowed
continue
relations[field_name] = field
nested_related_kwarg = getattr(self.Meta, 'nested_related_kwarg', None)
if reverse_relations:
assert nested_related_kwarg, \
"Set `nested_related_kwarg` in Meta options for use nested " \
"create feature"
if relations:
raise NotImplementedError("NestedUpdateMixin not provide update "
"for direct relations")
# Update instance
instance = super(NestedUpdateMixin, self).update(
instance, validated_data)
if reverse_relations:
self.update_reverse_relations(instance, reverse_relations)
self.delete_reverse_relations_if_need(instance, reverse_relations)
return instance
def create(self, validated_data):
reverse_relations = OrderedDict()
relations = OrderedDict()
# Sort fields by create priority
fields = self.get_sorted_by_create_priority(self.fields)
# Remove related fields from validated data for future manipulations
for field_name, field in fields.items():
if field.read_only or field_name in self._ignore_creation:
continue
if isinstance(field, serializers.ListSerializer):
if isinstance(field.child, serializers.ModelSerializer):
if validated_data.pop(field.source, None) is None:
# Skip field if field is not required or null allowed
continue
reverse_relations[field_name] = field.child
if isinstance(field, serializers.ModelSerializer):
if validated_data.pop(field.source, None) is None:
# Skip field if field is not required or null allowed
continue
relations[field_name] = field
nested_related_kwarg = getattr(self.Meta, 'nested_related_kwarg', None)
if reverse_relations:
assert nested_related_kwarg, \
"Set `nested_related_kwarg` in Meta options for use nested " \
"create feature"
# Create direct relations (foreign key)
for field_name, field in relations.items():
serializer = self._get_new_serializer(
field, data=self.initial_data[field_name])
serializer.is_valid(raise_exception=True)
validated_data[field.source] = serializer.save()
# Create instance
instance = super(NestedCreateMixin, self).create(validated_data)
if reverse_relations:
self.create_reverse_relations(instance, reverse_relations)
return instance