def __get__(self, instance, instance_type=None):
if instance is None:
return self
try:
return getattr(instance, self.cache_attr)
except AttributeError:
rel_obj = None
# Make sure to use ContentType.objects.get_for_id() to ensure that
# lookups are cached (see ticket #5570). This takes more code than
# the naive ``getattr(instance, self.ct_field)``, but has better
# performance when dealing with GFKs in loops and such.
f = self.model._meta.get_field(self.ct_field)
ct_id = getattr(instance, f.get_attname(), None)
if ct_id is not None:
ct = self.get_content_type(id=ct_id, using=instance._state.db)
try:
rel_obj = ct.get_object_for_this_type(pk=getattr(instance, self.fk_field))
except ObjectDoesNotExist:
pass
setattr(instance, self.cache_attr, rel_obj)
return rel_obj
python类ContentType()的实例源码
def __init__(self, data=None, files=None, instance=None, save_as_new=None,
prefix=None, queryset=None, **kwargs):
opts = self.model._meta
self.instance = instance
self.rel_name = '-'.join((
opts.app_label, opts.model_name,
self.ct_field.name, self.ct_fk_field.name,
))
if self.instance is None or self.instance.pk is None:
qs = self.model._default_manager.none()
else:
if queryset is None:
queryset = self.model._default_manager
qs = queryset.filter(**{
self.ct_field.name: ContentType.objects.get_for_model(
self.instance, for_concrete_model=self.for_concrete_model),
self.ct_fk_field.name: self.instance.pk,
})
super(BaseGenericInlineFormSet, self).__init__(
queryset=qs, data=data, files=files,
prefix=prefix,
**kwargs
)
def create_slug_for_model(self, model):
# prepare query set to generate slug
model_type = ContentType.objects.get_for_model(model)
queryset = self.get_queryset().exclude(
object_id=model.id,
content_type=model_type
)
# generate slugname
slug_text = unique_slugify(model.name, queryset=queryset,
default_slug=model.__class__.__name__)
# if not slug exists, create
if not self.get_queryset().filter(
object_id=model.id,
content_type=model_type,
slug=slug_text
).count():
s = self.create(content_object=model, slug=slug_text)
s.save()
# update slug on model
model.slug = slug_text
model.save()
def __get__(self, instance, instance_type=None):
if instance is None:
return self
try:
return getattr(instance, self.cache_attr)
except AttributeError:
rel_obj = None
# Make sure to use ContentType.objects.get_for_id() to ensure that
# lookups are cached (see ticket #5570). This takes more code than
# the naive ``getattr(instance, self.ct_field)``, but has better
# performance when dealing with GFKs in loops and such.
f = self.model._meta.get_field(self.ct_field)
ct_id = getattr(instance, f.get_attname(), None)
if ct_id is not None:
ct = self.get_content_type(id=ct_id, using=instance._state.db)
try:
rel_obj = ct.get_object_for_this_type(pk=getattr(instance, self.fk_field))
except ObjectDoesNotExist:
pass
setattr(instance, self.cache_attr, rel_obj)
return rel_obj
def __init__(self, data=None, files=None, instance=None, save_as_new=None,
prefix=None, queryset=None, **kwargs):
opts = self.model._meta
self.instance = instance
self.rel_name = '-'.join((
opts.app_label, opts.model_name,
self.ct_field.name, self.ct_fk_field.name,
))
if self.instance is None or self.instance.pk is None:
qs = self.model._default_manager.none()
else:
if queryset is None:
queryset = self.model._default_manager
qs = queryset.filter(**{
self.ct_field.name: ContentType.objects.get_for_model(
self.instance, for_concrete_model=self.for_concrete_model),
self.ct_fk_field.name: self.instance.pk,
})
super(BaseGenericInlineFormSet, self).__init__(
queryset=qs, data=data, files=files,
prefix=prefix,
**kwargs
)
def update_tags(self, obj, tag_names):
"""
Update tags associated with an object.
"""
ctype = ContentType.objects.get_for_model(obj)
current_tags = list(self.filter(items__content_type__pk=ctype.pk,
items__object_id=obj.pk))
updated_tag_names = parse_tag_input(tag_names)
if settings.FORCE_LOWERCASE_TAGS:
updated_tag_names = [t.lower() for t in updated_tag_names]
# Remove tags which no longer apply
tags_for_removal = [tag for tag in current_tags \
if tag.name not in updated_tag_names]
if len(tags_for_removal):
TaggedItem._default_manager.filter(content_type__pk=ctype.pk,
object_id=obj.pk,
tag__in=tags_for_removal).delete()
# Add new tags
current_tag_names = [tag.name for tag in current_tags]
for tag_name in updated_tag_names:
if tag_name not in current_tag_names:
tag, created = self.get_or_create(name=tag_name)
TaggedItem._default_manager.create(tag=tag, object=obj)
def add_tag(self, obj, tag_name):
"""
Associates the given object with a tag.
"""
tag_names = parse_tag_input(tag_name)
if not len(tag_names):
raise AttributeError(_('No tags were given: "%(name)s".') % {
'name': tag_name})
if len(tag_names) > 1:
raise AttributeError(_('Multiple tags were given: "%(name)s".') % {
'name': tag_name})
tag_name = tag_names[0]
if settings.FORCE_LOWERCASE_TAGS:
tag_name = tag_name.lower()
tag, created = self.get_or_create(name=tag_name)
ctype = ContentType.objects.get_for_model(obj)
TaggedItem._default_manager.get_or_create(
tag=tag, content_type=ctype, object_id=obj.pk)
def add_view_permissions(sender, **kwargs):
"""
This syncdb hooks takes care of adding a view permission too all our
content types.
"""
# for each of our content types
for content_type in ContentType.objects.all():
# build our permission slug
codename = "view_%s" % content_type.model
# if it doesn't exist..
if not Permission.objects.filter(content_type=content_type, codename=codename):
# add it
Permission.objects.create(content_type=content_type,
codename=codename,
name="Can view %s" % content_type.name)
#print "Added view permission for %s" % content_type.name
# check for all our view permissions after a syncdb
def __init__(self, data=None, files=None, instance=None, save_as_new=None,
prefix=None, queryset=None, **kwargs):
opts = self.model._meta
self.instance = instance
self.rel_name = '-'.join((
opts.app_label, opts.model_name,
self.ct_field.name, self.ct_fk_field.name,
))
if self.instance is None or self.instance.pk is None:
qs = self.model._default_manager.none()
else:
if queryset is None:
queryset = self.model._default_manager
qs = queryset.filter(**{
self.ct_field.name: ContentType.objects.get_for_model(
self.instance, for_concrete_model=self.for_concrete_model),
self.ct_fk_field.name: self.instance.pk,
})
super(BaseGenericInlineFormSet, self).__init__(
queryset=qs, data=data, files=files,
prefix=prefix,
**kwargs
)
def __init__(self, data=None, files=None, instance=None, save_as_new=None,
prefix=None, queryset=None, **kwargs):
opts = self.model._meta
self.instance = instance
self.rel_name = '-'.join((
opts.app_label, opts.model_name,
self.ct_field.name, self.ct_fk_field.name,
))
if self.instance is None or self.instance.pk is None:
qs = self.model._default_manager.none()
else:
if queryset is None:
queryset = self.model._default_manager
qs = queryset.filter(**{
self.ct_field.name: ContentType.objects.get_for_model(
self.instance, for_concrete_model=self.for_concrete_model),
self.ct_fk_field.name: self.instance.pk,
})
super(BaseGenericInlineFormSet, self).__init__(
queryset=qs, data=data, files=files,
prefix=prefix,
**kwargs
)
def get_forward_related_filter(self, obj):
"""See corresponding method on RelatedField"""
return {
self.fk_field: obj.pk,
self.ct_field: ContentType.objects.get_for_model(obj).pk,
}
def get_content_type(self, obj=None, id=None, using=None):
if obj is not None:
return ContentType.objects.db_manager(obj._state.db).get_for_model(
obj, for_concrete_model=self.for_concrete_model)
elif id is not None:
return ContentType.objects.db_manager(using).get_for_id(id)
else:
# This should never happen. I love comments like this, don't you?
raise Exception("Impossible arguments to GFK.get_content_type!")
def get_content_type(self):
"""
Return the content type associated with this field's model.
"""
return ContentType.objects.get_for_model(self.model,
for_concrete_model=self.for_concrete_model)
def save_new(self, form, commit=True):
setattr(form.instance, self.ct_field.get_attname(),
ContentType.objects.get_for_model(self.instance).pk)
setattr(form.instance, self.ct_fk_field.get_attname(),
self.instance.pk)
return form.save(commit=commit)
def generic_inlineformset_factory(model, form=ModelForm,
formset=BaseGenericInlineFormSet,
ct_field="content_type", fk_field="object_id",
fields=None, exclude=None,
extra=3, can_order=False, can_delete=True,
max_num=None, formfield_callback=None,
validate_max=False, for_concrete_model=True,
min_num=None, validate_min=False):
"""
Returns a ``GenericInlineFormSet`` for the given kwargs.
You must provide ``ct_field`` and ``fk_field`` if they are different from
the defaults ``content_type`` and ``object_id`` respectively.
"""
opts = model._meta
# if there is no field called `ct_field` let the exception propagate
ct_field = opts.get_field(ct_field)
if not isinstance(ct_field, models.ForeignKey) or ct_field.remote_field.model != ContentType:
raise Exception("fk_name '%s' is not a ForeignKey to ContentType" % ct_field)
fk_field = opts.get_field(fk_field) # let the exception propagate
if exclude is not None:
exclude = list(exclude)
exclude.extend([ct_field.name, fk_field.name])
else:
exclude = [ct_field.name, fk_field.name]
FormSet = modelformset_factory(model, form=form,
formfield_callback=formfield_callback,
formset=formset,
extra=extra, can_delete=can_delete, can_order=can_order,
fields=fields, exclude=exclude, max_num=max_num,
validate_max=validate_max, min_num=min_num,
validate_min=validate_min)
FormSet.ct_field = ct_field
FormSet.ct_fk_field = fk_field
FormSet.for_concrete_model = for_concrete_model
return FormSet
def get_content_type_for_model(obj):
# Since this module gets imported in the application's root package,
# it cannot import models from other applications at the module level.
from django.contrib.contenttypes.models import ContentType
return ContentType.objects.get_for_model(obj, for_concrete_model=False)
def get_view_on_site_url(self, obj=None):
if obj is None or not self.view_on_site:
return None
if callable(self.view_on_site):
return self.view_on_site(obj)
elif self.view_on_site and hasattr(obj, 'get_absolute_url'):
# use the ContentType lookup if view_on_site is True
return reverse('admin:view_on_site', kwargs={
'content_type_id': get_content_type_for_model(obj).pk,
'object_id': obj.pk
})
def get_forward_related_filter(self, obj):
"""See corresponding method on RelatedField"""
return {
self.fk_field: obj.pk,
self.ct_field: ContentType.objects.get_for_model(obj).pk,
}
def get_content_type(self, obj=None, id=None, using=None):
if obj is not None:
return ContentType.objects.db_manager(obj._state.db).get_for_model(
obj, for_concrete_model=self.for_concrete_model)
elif id is not None:
return ContentType.objects.db_manager(using).get_for_id(id)
else:
# This should never happen. I love comments like this, don't you?
raise Exception("Impossible arguments to GFK.get_content_type!")
def __get__(self, instance, cls=None):
if instance is None:
return self
# Don't use getattr(instance, self.ct_field) here because that might
# reload the same ContentType over and over (#5570). Instead, get the
# content type ID here, and later when the actual instance is needed,
# use ContentType.objects.get_for_id(), which has a global cache.
f = self.model._meta.get_field(self.ct_field)
ct_id = getattr(instance, f.get_attname(), None)
pk_val = getattr(instance, self.fk_field)
try:
rel_obj = getattr(instance, self.cache_attr)
except AttributeError:
rel_obj = None
else:
if rel_obj and (ct_id != self.get_content_type(obj=rel_obj, using=instance._state.db).id or
rel_obj._meta.pk.to_python(pk_val) != rel_obj._get_pk_val()):
rel_obj = None
if rel_obj is not None:
return rel_obj
if ct_id is not None:
ct = self.get_content_type(id=ct_id, using=instance._state.db)
try:
rel_obj = ct.get_object_for_this_type(pk=pk_val)
except ObjectDoesNotExist:
pass
setattr(instance, self.cache_attr, rel_obj)
return rel_obj