def add_update_values(self, values):
"""
Convert a dictionary of field name to value mappings into an update
query. This is the entry point for the public update() method on
querysets.
"""
values_seq = []
for name, val in six.iteritems(values):
field = self.get_meta().get_field(name)
direct = not (field.auto_created and not field.concrete) or not field.concrete
model = field.model._meta.concrete_model
if not direct or (field.is_relation and field.many_to_many):
raise FieldError(
'Cannot update model field %r (only non-relations and '
'foreign keys permitted).' % field
)
if model is not self.get_meta().model:
self.add_related_update(model, field, val)
continue
values_seq.append((field, model, val))
return self.add_update_fields(values_seq)
python类FieldError()的实例源码
def solve_lookup_type(self, lookup):
"""
Solve the lookup type from the lookup (eg: 'foobar__id__icontains')
"""
lookup_splitted = lookup.split(LOOKUP_SEP)
if self._annotations:
expression, expression_lookups = refs_expression(lookup_splitted, self.annotations)
if expression:
return expression_lookups, (), expression
_, field, _, lookup_parts = self.names_to_path(lookup_splitted, self.get_meta())
field_parts = lookup_splitted[0:len(lookup_splitted) - len(lookup_parts)]
if len(lookup_parts) == 0:
lookup_parts = ['exact']
elif len(lookup_parts) > 1:
if not field_parts:
raise FieldError(
'Invalid lookup "%s" for model %s".' %
(lookup, self.get_meta().model.__name__))
return lookup_parts, field_parts, False
def resolve_ref(self, name, allow_joins=True, reuse=None, summarize=False):
if not allow_joins and LOOKUP_SEP in name:
raise FieldError("Joined field references are not permitted in this query")
if name in self.annotations:
if summarize:
# Summarize currently means we are doing an aggregate() query
# which is executed as a wrapped subquery if any of the
# aggregate() elements reference an existing annotation. In
# that case we need to return a Ref to the subquery's annotation.
return Ref(name, self.annotation_select[name])
else:
return self.annotation_select[name]
else:
field_list = name.split(LOOKUP_SEP)
field, sources, opts, join_list, path = self.setup_joins(
field_list, self.get_meta(),
self.get_initial_alias(), reuse)
targets, _, join_list = self.trim_joins(sources, join_list, path)
if len(targets) > 1:
raise FieldError("Referencing multicolumn fields with F() objects "
"isn't supported")
if reuse is not None:
reuse.update(join_list)
col = targets[0].get_col(join_list[-1], sources[0])
return col
def prepare_value(self, field, value):
"""
Prepare a value to be used in a query by resolving it if it is an
expression and otherwise calling the field's get_db_prep_save().
"""
if hasattr(value, 'resolve_expression'):
value = value.resolve_expression(self.query, allow_joins=False, for_save=True)
# Don't allow values containing Col expressions. They refer to
# existing columns on a row, but in the case of insert the row
# doesn't exist yet.
if value.contains_column_references:
raise ValueError(
'Failed to insert expression "%s" on %s. F() expressions '
'can only be used to update, not to insert.' % (value, field)
)
if value.contains_aggregate:
raise FieldError("Aggregate functions are not allowed in this query")
else:
value = field.get_db_prep_save(value, connection=self.connection)
return value
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
for expr in expression.get_source_expressions():
try:
output_field = expr.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev, and Variance '
'aggregations on date/time fields in sqlite3 '
'since date/time is saved as text.'
)
except FieldError:
# Not every subexpression has an output_field which is fine
# to ignore.
pass
def solve_lookup_type(self, lookup):
"""
Solve the lookup type from the lookup (eg: 'foobar__id__icontains')
"""
lookup_splitted = lookup.split(LOOKUP_SEP)
if self._annotations:
expression, expression_lookups = refs_expression(lookup_splitted, self.annotations)
if expression:
return expression_lookups, (), expression
_, field, _, lookup_parts = self.names_to_path(lookup_splitted, self.get_meta())
field_parts = lookup_splitted[0:len(lookup_splitted) - len(lookup_parts)]
if len(lookup_parts) == 0:
lookup_parts = ['exact']
elif len(lookup_parts) > 1:
if not field_parts:
raise FieldError(
'Invalid lookup "%s" for model %s".' %
(lookup, self.get_meta().model.__name__))
return lookup_parts, field_parts, False
def resolve_ref(self, name, allow_joins=True, reuse=None, summarize=False):
if not allow_joins and LOOKUP_SEP in name:
raise FieldError("Joined field references are not permitted in this query")
if name in self.annotations:
if summarize:
# Summarize currently means we are doing an aggregate() query
# which is executed as a wrapped subquery if any of the
# aggregate() elements reference an existing annotation. In
# that case we need to return a Ref to the subquery's annotation.
return Ref(name, self.annotation_select[name])
else:
return self.annotation_select[name]
else:
field_list = name.split(LOOKUP_SEP)
field, sources, opts, join_list, path = self.setup_joins(
field_list, self.get_meta(),
self.get_initial_alias(), reuse)
targets, _, join_list = self.trim_joins(sources, join_list, path)
if len(targets) > 1:
raise FieldError("Referencing multicolumn fields with F() objects "
"isn't supported")
if reuse is not None:
reuse.update(join_list)
col = targets[0].get_col(join_list[-1], sources[0])
return col
def add_ordering(self, *ordering):
"""
Adds items from the 'ordering' sequence to the query's "order by"
clause. These items are either field names (not column names) --
possibly with a direction prefix ('-' or '?') -- or OrderBy
expressions.
If 'ordering' is empty, all ordering is cleared from the query.
"""
errors = []
for item in ordering:
if not hasattr(item, 'resolve_expression') and not ORDER_PATTERN.match(item):
errors.append(item)
if getattr(item, 'contains_aggregate', False):
raise FieldError(
'Using an aggregate in order_by() without also including '
'it in annotate() is not allowed: %s' % item
)
if errors:
raise FieldError('Invalid order_by arguments: %s' % errors)
if ordering:
self.order_by.extend(ordering)
else:
self.default_ordering = False
def prepare_value(self, field, value):
"""
Prepare a value to be used in a query by resolving it if it is an
expression and otherwise calling the field's get_db_prep_save().
"""
if hasattr(value, 'resolve_expression'):
value = value.resolve_expression(self.query, allow_joins=False, for_save=True)
# Don't allow values containing Col expressions. They refer to
# existing columns on a row, but in the case of insert the row
# doesn't exist yet.
if value.contains_column_references:
raise ValueError(
'Failed to insert expression "%s" on %s. F() expressions '
'can only be used to update, not to insert.' % (value, field)
)
if value.contains_aggregate:
raise FieldError("Aggregate functions are not allowed in this query")
else:
value = field.get_db_prep_save(value, connection=self.connection)
return value
def get_form_class(self, obj=None, modelform_class=None, **kwargs):
"""
Returns a Form class for use in the add/edit views.
"""
# form will have been passed by an upstream call to get_formset_class
# so if it is missing, this must be a single-object view on a non-inline
# controller
if modelform_class is None:
modelform_class = self.modelform_class
form_class_kwargs = self.get_form_class_kwargs(
modelform_class=modelform_class, obj=obj, **kwargs)
try:
ModelForm = forms.modelform_factory(self.model, **form_class_kwargs)
except FieldError as e:
raise FieldError(
'%s. Check fields/fieldsets/exclude attributes of class %s.'
% (e, self.__class__.__name__)
)
return ModelForm
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
for expr in expression.get_source_expressions():
try:
output_field = expr.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev, and Variance '
'aggregations on date/time fields in sqlite3 '
'since date/time is saved as text.'
)
except FieldError:
# Not every subexpression has an output_field which is fine
# to ignore.
pass
def solve_lookup_type(self, lookup):
"""
Solve the lookup type from the lookup (eg: 'foobar__id__icontains')
"""
lookup_splitted = lookup.split(LOOKUP_SEP)
if self._annotations:
expression, expression_lookups = refs_expression(lookup_splitted, self.annotations)
if expression:
return expression_lookups, (), expression
_, field, _, lookup_parts = self.names_to_path(lookup_splitted, self.get_meta())
field_parts = lookup_splitted[0:len(lookup_splitted) - len(lookup_parts)]
if len(lookup_parts) == 0:
lookup_parts = ['exact']
elif len(lookup_parts) > 1:
if not field_parts:
raise FieldError(
'Invalid lookup "%s" for model %s".' %
(lookup, self.get_meta().model.__name__))
return lookup_parts, field_parts, False
def resolve_ref(self, name, allow_joins=True, reuse=None, summarize=False):
if not allow_joins and LOOKUP_SEP in name:
raise FieldError("Joined field references are not permitted in this query")
if name in self.annotations:
if summarize:
# Summarize currently means we are doing an aggregate() query
# which is executed as a wrapped subquery if any of the
# aggregate() elements reference an existing annotation. In
# that case we need to return a Ref to the subquery's annotation.
return Ref(name, self.annotation_select[name])
else:
return self.annotation_select[name]
else:
field_list = name.split(LOOKUP_SEP)
field, sources, opts, join_list, path = self.setup_joins(
field_list, self.get_meta(),
self.get_initial_alias(), reuse)
targets, _, join_list = self.trim_joins(sources, join_list, path)
if len(targets) > 1:
raise FieldError("Referencing multicolumn fields with F() objects "
"isn't supported")
if reuse is not None:
reuse.update(join_list)
col = targets[0].get_col(join_list[-1], sources[0])
return col
def add_ordering(self, *ordering):
"""
Adds items from the 'ordering' sequence to the query's "order by"
clause. These items are either field names (not column names) --
possibly with a direction prefix ('-' or '?') -- or OrderBy
expressions.
If 'ordering' is empty, all ordering is cleared from the query.
"""
errors = []
for item in ordering:
if not hasattr(item, 'resolve_expression') and not ORDER_PATTERN.match(item):
errors.append(item)
if getattr(item, 'contains_aggregate', False):
raise FieldError(
'Using an aggregate in order_by() without also including '
'it in annotate() is not allowed: %s' % item
)
if errors:
raise FieldError('Invalid order_by arguments: %s' % errors)
if ordering:
self.order_by.extend(ordering)
else:
self.default_ordering = False
def prepare_value(self, field, value):
"""
Prepare a value to be used in a query by resolving it if it is an
expression and otherwise calling the field's get_db_prep_save().
"""
if hasattr(value, 'resolve_expression'):
value = value.resolve_expression(self.query, allow_joins=False, for_save=True)
# Don't allow values containing Col expressions. They refer to
# existing columns on a row, but in the case of insert the row
# doesn't exist yet.
if value.contains_column_references:
raise ValueError(
'Failed to insert expression "%s" on %s. F() expressions '
'can only be used to update, not to insert.' % (value, field)
)
if value.contains_aggregate:
raise FieldError("Aggregate functions are not allowed in this query")
else:
value = field.get_db_prep_save(value, connection=self.connection)
return value
def check_expression_support(self, expression):
bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField)
bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev)
if isinstance(expression, bad_aggregates):
for expr in expression.get_source_expressions():
try:
output_field = expr.output_field
if isinstance(output_field, bad_fields):
raise NotImplementedError(
'You cannot use Sum, Avg, StdDev, and Variance '
'aggregations on date/time fields in sqlite3 '
'since date/time is saved as text.'
)
except FieldError:
# Not every subexpression has an output_field which is fine
# to ignore.
pass
def add_update_values(self, values):
"""
Convert a dictionary of field name to value mappings into an update
query. This is the entry point for the public update() method on
querysets.
"""
values_seq = []
for name, val in six.iteritems(values):
field = self.get_meta().get_field(name)
direct = not (field.auto_created and not field.concrete) or not field.concrete
model = field.model._meta.concrete_model
if not direct or (field.is_relation and field.many_to_many):
raise FieldError(
'Cannot update model field %r (only non-relations and '
'foreign keys permitted).' % field
)
if model is not self.get_meta().model:
self.add_related_update(model, field, val)
continue
values_seq.append((field, model, val))
return self.add_update_fields(values_seq)
def solve_lookup_type(self, lookup):
"""
Solve the lookup type from the lookup (eg: 'foobar__id__icontains')
"""
lookup_splitted = lookup.split(LOOKUP_SEP)
if self._annotations:
expression, expression_lookups = refs_expression(lookup_splitted, self.annotations)
if expression:
return expression_lookups, (), expression
_, field, _, lookup_parts = self.names_to_path(lookup_splitted, self.get_meta())
field_parts = lookup_splitted[0:len(lookup_splitted) - len(lookup_parts)]
if len(lookup_parts) == 0:
lookup_parts = ['exact']
elif len(lookup_parts) > 1:
if not field_parts:
raise FieldError(
'Invalid lookup "%s" for model %s".' %
(lookup, self.get_meta().model.__name__))
return lookup_parts, field_parts, False
def resolve_ref(self, name, allow_joins=True, reuse=None, summarize=False):
if not allow_joins and LOOKUP_SEP in name:
raise FieldError("Joined field references are not permitted in this query")
if name in self.annotations:
if summarize:
# Summarize currently means we are doing an aggregate() query
# which is executed as a wrapped subquery if any of the
# aggregate() elements reference an existing annotation. In
# that case we need to return a Ref to the subquery's annotation.
return Ref(name, self.annotation_select[name])
else:
return self.annotation_select[name]
else:
field_list = name.split(LOOKUP_SEP)
field, sources, opts, join_list, path = self.setup_joins(
field_list, self.get_meta(),
self.get_initial_alias(), reuse)
targets, _, join_list = self.trim_joins(sources, join_list, path)
if len(targets) > 1:
raise FieldError("Referencing multicolumn fields with F() objects "
"isn't supported")
if reuse is not None:
reuse.update(join_list)
col = targets[0].get_col(join_list[-1], sources[0])
return col
def prepare_value(self, field, value):
"""
Prepare a value to be used in a query by resolving it if it is an
expression and otherwise calling the field's get_db_prep_save().
"""
if hasattr(value, 'resolve_expression'):
value = value.resolve_expression(self.query, allow_joins=False, for_save=True)
# Don't allow values containing Col expressions. They refer to
# existing columns on a row, but in the case of insert the row
# doesn't exist yet.
if value.contains_column_references:
raise ValueError(
'Failed to insert expression "%s" on %s. F() expressions '
'can only be used to update, not to insert.' % (value, field)
)
if value.contains_aggregate:
raise FieldError("Aggregate functions are not allowed in this query")
else:
value = field.get_db_prep_save(value, connection=self.connection)
return value