def contribute_to_related_class(self, cls, related):
# Internal FK's - i.e., those with a related name ending with '+' -
# and swapped models don't get a related descriptor.
if not self.remote_field.is_hidden() and not related.related_model._meta.swapped:
setattr(cls, related.get_accessor_name(), self.related_accessor_class(related))
# While 'limit_choices_to' might be a callable, simply pass
# it along for later - this is too early because it's still
# model load time.
if self.remote_field.limit_choices_to:
cls._meta.related_fkey_lookups.append(self.remote_field.limit_choices_to)
python类append()的实例源码
def _check_related_query_name_is_valid(self):
if self.remote_field.is_hidden():
return []
rel_query_name = self.related_query_name()
errors = []
if rel_query_name.endswith('_'):
errors.append(
checks.Error(
"Reverse query name '%s' must not end with an underscore."
% (rel_query_name,),
hint=("Add or change a related_name or related_query_name "
"argument for this field."),
obj=self,
id='fields.E308',
)
)
if LOOKUP_SEP in rel_query_name:
errors.append(
checks.Error(
"Reverse query name '%s' must not contain '%s'."
% (rel_query_name, LOOKUP_SEP),
hint=("Add or change a related_name or related_query_name "
"argument for this field."),
obj=self,
id='fields.E309',
)
)
return errors
def resolve_related_fields(self):
if len(self.from_fields) < 1 or len(self.from_fields) != len(self.to_fields):
raise ValueError('Foreign Object from and to fields must be the same non-zero length')
if isinstance(self.remote_field.model, six.string_types):
raise ValueError('Related model %r cannot be resolved' % self.remote_field.model)
related_fields = []
for index in range(len(self.from_fields)):
from_field_name = self.from_fields[index]
to_field_name = self.to_fields[index]
from_field = (self if from_field_name == 'self'
else self.opts.get_field(from_field_name))
to_field = (self.remote_field.model._meta.pk if to_field_name is None
else self.remote_field.model._meta.get_field(to_field_name))
related_fields.append((from_field, to_field))
return related_fields
def contribute_to_related_class(self, cls, related):
# Internal FK's - i.e., those with a related name ending with '+' -
# and swapped models don't get a related descriptor.
if not self.remote_field.is_hidden() and not related.related_model._meta.swapped:
setattr(cls._meta.concrete_model, related.get_accessor_name(), self.related_accessor_class(related))
# While 'limit_choices_to' might be a callable, simply pass
# it along for later - this is too early because it's still
# model load time.
if self.remote_field.limit_choices_to:
cls._meta.related_fkey_lookups.append(self.remote_field.limit_choices_to)
def resolve_related_fields(self):
if len(self.from_fields) < 1 or len(self.from_fields) != len(self.to_fields):
raise ValueError('Foreign Object from and to fields must be the same non-zero length')
if isinstance(self.rel.to, six.string_types):
raise ValueError('Related model %r cannot be resolved' % self.rel.to)
related_fields = []
for index in range(len(self.from_fields)):
from_field_name = self.from_fields[index]
to_field_name = self.to_fields[index]
from_field = (self if from_field_name == 'self'
else self.opts.get_field(from_field_name))
to_field = (self.rel.to._meta.pk if to_field_name is None
else self.rel.to._meta.get_field(to_field_name))
related_fields.append((from_field, to_field))
return related_fields
def contribute_to_related_class(self, cls, related):
# Internal FK's - i.e., those with a related name ending with '+' -
# and swapped models don't get a related descriptor.
if not self.rel.is_hidden() and not related.related_model._meta.swapped:
setattr(cls, related.get_accessor_name(), self.related_accessor_class(related))
# While 'limit_choices_to' might be a callable, simply pass
# it along for later - this is too early because it's still
# model load time.
if self.rel.limit_choices_to:
cls._meta.related_fkey_lookups.append(self.rel.limit_choices_to)
def wsdl_call_get_params(self, method, input, args, kwargs):
"""Build params from input and args/kwargs"""
params = inputname = inputargs = None
all_args = {}
if input:
inputname = list(input.keys())[0]
inputargs = input[inputname]
if input and args:
# convert positional parameters to named parameters:
d = {}
for idx, arg in enumerate(args):
key = list(inputargs.keys())[idx]
if isinstance(arg, dict):
if key not in arg:
raise KeyError('Unhandled key %s. use client.help(method)' % key)
d[key] = arg[key]
else:
d[key] = arg
all_args.update({inputname: d})
if input and (kwargs or all_args):
if kwargs:
all_args.update({inputname: kwargs})
valid, errors, warnings = self.wsdl_validate_params(input, all_args)
if not valid:
raise ValueError('Invalid Args Structure. Errors: %s' % errors)
# sort and filter parameters according to wsdl input structure
tree = sort_dict(input, all_args)
root = list(tree.values())[0]
params = []
# make a params tuple list suitable for self.call(method, *params)
for k, v in root.items():
# fix referenced namespaces as info is lost when calling call
root_ns = root.namespaces[k]
if not root.references[k] and isinstance(v, Struct):
v.namespaces[None] = root_ns
params.append((k, v))
# TODO: check style and document attributes
if self.__soap_server in ('axis', ):
# use the operation name
method = method
else:
# use the message (element) name
method = inputname
#elif not input:
#TODO: no message! (see wsmtxca.dummy)
else:
params = kwargs and kwargs.items()
return (method, params)
def wsdl_call_get_params(self, method, input, args, kwargs):
"""Build params from input and args/kwargs"""
params = inputname = inputargs = None
all_args = {}
if input:
inputname = list(input.keys())[0]
inputargs = input[inputname]
if input and args:
# convert positional parameters to named parameters:
d = {}
for idx, arg in enumerate(args):
key = list(inputargs.keys())[idx]
if isinstance(arg, dict):
if key not in arg:
raise KeyError('Unhandled key %s. use client.help(method)' % key)
d[key] = arg[key]
else:
d[key] = arg
all_args.update({inputname: d})
if input and (kwargs or all_args):
if kwargs:
all_args.update({inputname: kwargs})
valid, errors, warnings = self.wsdl_validate_params(input, all_args)
if not valid:
raise ValueError('Invalid Args Structure. Errors: %s' % errors)
# sort and filter parameters according to wsdl input structure
tree = sort_dict(input, all_args)
root = list(tree.values())[0]
params = []
# make a params tuple list suitable for self.call(method, *params)
for k, v in root.items():
# fix referenced namespaces as info is lost when calling call
root_ns = root.namespaces[k]
if not root.references[k] and isinstance(v, Struct):
v.namespaces[None] = root_ns
params.append((k, v))
# TODO: check style and document attributes
if self.__soap_server in ('axis', ):
# use the operation name
method = method
else:
# use the message (element) name
method = inputname
#elif not input:
#TODO: no message! (see wsmtxca.dummy)
else:
params = kwargs and kwargs.items()
return (method, params)
def add_lazy_relation(cls, field, relation, operation):
"""
Adds a lookup on ``cls`` when a related field is defined using a string,
i.e.::
class MyModel(Model):
fk = ForeignKey("AnotherModel")
This string can be:
* RECURSIVE_RELATIONSHIP_CONSTANT (i.e. "self") to indicate a recursive
relation.
* The name of a model (i.e "AnotherModel") to indicate another model in
the same app.
* An app-label and model name (i.e. "someapp.AnotherModel") to indicate
another model in a different app.
If the other model hasn't yet been loaded -- almost a given if you're using
lazy relationships -- then the relation won't be set up until the
class_prepared signal fires at the end of model initialization.
operation is the work that must be performed once the relation can be resolved.
"""
# Check for recursive relations
if relation == RECURSIVE_RELATIONSHIP_CONSTANT:
app_label = cls._meta.app_label
model_name = cls.__name__
else:
# Look for an "app.Model" relation
if isinstance(relation, six.string_types):
try:
app_label, model_name = relation.split(".")
except ValueError:
# If we can't split, assume a model in current app
app_label = cls._meta.app_label
model_name = relation
else:
# it's actually a model class
app_label = relation._meta.app_label
model_name = relation._meta.object_name
# Try to look up the related model, and if it's already loaded resolve the
# string right away. If get_registered_model raises a LookupError, it means
# that the related model isn't loaded yet, so we need to pend the relation
# until the class is prepared.
try:
model = cls._meta.apps.get_registered_model(app_label, model_name)
except LookupError:
key = (app_label, model_name)
value = (cls, field, operation)
cls._meta.apps._pending_lookups.setdefault(key, []).append(value)
else:
operation(field, model, cls)
def get_lookup_constraint(self, constraint_class, alias, targets, sources, lookups,
raw_value):
from django.db.models.sql.where import SubqueryConstraint, AND, OR
root_constraint = constraint_class()
assert len(targets) == len(sources)
if len(lookups) > 1:
raise exceptions.FieldError('Relation fields do not support nested lookups')
lookup_type = lookups[0]
def get_normalized_value(value):
from django.db.models import Model
if isinstance(value, Model):
value_list = []
for source in sources:
# Account for one-to-one relations when sent a different model
while not isinstance(value, source.model) and source.rel:
source = source.rel.to._meta.get_field(source.rel.field_name)
value_list.append(getattr(value, source.attname))
return tuple(value_list)
elif not isinstance(value, tuple):
return (value,)
return value
is_multicolumn = len(self.related_fields) > 1
if (hasattr(raw_value, '_as_sql') or
hasattr(raw_value, 'get_compiler')):
root_constraint.add(SubqueryConstraint(alias, [target.column for target in targets],
[source.name for source in sources], raw_value),
AND)
elif lookup_type == 'isnull':
root_constraint.add(IsNull(targets[0].get_col(alias, sources[0]), raw_value), AND)
elif (lookup_type == 'exact' or (lookup_type in ['gt', 'lt', 'gte', 'lte']
and not is_multicolumn)):
value = get_normalized_value(raw_value)
for target, source, val in zip(targets, sources, value):
lookup_class = target.get_lookup(lookup_type)
root_constraint.add(
lookup_class(target.get_col(alias, source), val), AND)
elif lookup_type in ['range', 'in'] and not is_multicolumn:
values = [get_normalized_value(value) for value in raw_value]
value = [val[0] for val in values]
lookup_class = targets[0].get_lookup(lookup_type)
root_constraint.add(lookup_class(targets[0].get_col(alias, sources[0]), value), AND)
elif lookup_type == 'in':
values = [get_normalized_value(value) for value in raw_value]
for value in values:
value_constraint = constraint_class()
for source, target, val in zip(sources, targets, value):
lookup_class = target.get_lookup('exact')
lookup = lookup_class(target.get_col(alias, source), val)
value_constraint.add(lookup, AND)
root_constraint.add(value_constraint, OR)
else:
raise TypeError('Related Field got invalid lookup: %s' % lookup_type)
return root_constraint