def _get_content_file(self, app_label, model_name, field_name, fname):
try:
saved_files = self.saves_by_field[(app_label, model_name, field_name)]
except KeyError:
raise StatsTestStorageError(
"{}.{}.{} has not been written to yet so there is nothing to read.".format(
app_label, model_name, field_name))
if fname not in saved_files:
raise StatsTestStorageError(
"{}.{}.{} has not had a file named '{}' written to it. "
"Be careful - the storage engine may have added some random "
"characters to the name before attempting to write.".format(
app_label, model_name, field_name, fname))
field = apps.get_model(app_label, model_name)._meta.get_field(field_name)
return field.storage.open_no_log(fname)
python类get_model()的实例源码
def _from_tables(self):
if hasattr(self._view_meta, "from_tables"):
return self._view_meta.from_tables
from django.apps import apps
from_models = self._view_meta.from_models
from_tables = set()
for label in from_models:
if "." in label:
app_label, model_name = label.split(".")
model = apps.get_model(app_label=app_label, model_name=model_name)
if issubclass(model, View):
from_tables = from_tables | model._from_tables()
else:
from_tables.add(model._meta.db_table)
else:
from_tables.add(label)
setattr(self._view_meta, "from_tables", from_tables)
return from_tables
def _from_view_models(self):
if hasattr(self._view_meta, "from_view_models"):
return self._view_meta.from_view_models
tables = {
model._meta.db_table: model for model in apps.get_models()
}
from_models = self._view_meta.from_models
from_view_models = set()
for label in from_models:
if "." in label:
app_label, model_name = label.split(".")
model = apps.get_model(app_label=app_label, model_name=model_name)
if issubclass(model, View):
from_view_models.add(model)
else:
model = tables[label]
if issubclass(model, View):
from_view_models.add(model)
setattr(self._view_meta, "from_view_models", from_view_models)
return from_view_models
def _get_model_from_node(self, node, attr):
"""
Helper to look up a model from a <object model=...> or a <field
rel=... to=...> node.
"""
model_identifier = node.getAttribute(attr)
if not model_identifier:
raise base.DeserializationError(
"<%s> node is missing the required '%s' attribute"
% (node.nodeName, attr))
try:
return apps.get_model(model_identifier)
except (LookupError, TypeError):
raise base.DeserializationError(
"<%s> node has invalid model identifier: '%s'"
% (node.nodeName, model_identifier))
def _load_field(app_label, model_name, field_name):
return apps.get_model(app_label, model_name)._meta.get_field(field_name)
# A guide to Field parameters:
#
# * name: The name of the field specified in the model.
# * attname: The attribute to use on the model object. This is the same as
# "name", except in the case of ForeignKeys, where "_id" is
# appended.
# * db_column: The db_column specified in the model (or None).
# * column: The database column for this field. This is the same as
# "attname", except if db_column is specified.
#
# Code that introspects values, or does other dynamic things, should use
# attname. For example, this gets the primary key value of object "obj":
#
# getattr(obj, opts.pk.attname)
def _get_model_from_node(self, node, attr):
"""
Helper to look up a model from a <object model=...> or a <field
rel=... to=...> node.
"""
model_identifier = node.getAttribute(attr)
if not model_identifier:
raise base.DeserializationError(
"<%s> node is missing the required '%s' attribute"
% (node.nodeName, attr))
try:
return apps.get_model(model_identifier)
except (LookupError, TypeError):
raise base.DeserializationError(
"<%s> node has invalid model identifier: '%s'"
% (node.nodeName, model_identifier))
def _rename(self, apps, schema_editor, old_model, new_model):
ContentType = apps.get_model('contenttypes', 'ContentType')
db = schema_editor.connection.alias
if not router.allow_migrate_model(db, ContentType):
return
try:
content_type = ContentType.objects.db_manager(db).get_by_natural_key(self.app_label, old_model)
except ContentType.DoesNotExist:
pass
else:
content_type.model = new_model
try:
with transaction.atomic(using=db):
content_type.save(update_fields={'model'})
except IntegrityError:
# Gracefully fallback if a stale content type causes a
# conflict as update_contenttypes will take care of asking the
# user what should be done next.
content_type.model = old_model
else:
# Clear the cache as the `get_by_natual_key()` call will cache
# the renamed ContentType instance by its old model name.
ContentType.objects.clear_cache()
def context(self, context):
btns = []
for b in self.q_btns:
btn = {}
if 'model' in b:
model = self.get_model(b['model'])
if not self.user.has_perm("%s.view_%s" % (model._meta.app_label, model._meta.model_name)):
continue
btn['url'] = reverse("%s:%s_%s_%s" % (self.admin_site.app_name, model._meta.app_label,
model._meta.model_name, b.get('view', 'changelist')))
btn['title'] = model._meta.verbose_name
btn['icon'] = self.dashboard.get_model_icon(model)
else:
try:
btn['url'] = reverse(b['url'])
except NoReverseMatch:
btn['url'] = b['url']
if 'title' in b:
btn['title'] = b['title']
if 'icon' in b:
btn['icon'] = b['icon']
btns.append(btn)
context.update({'btns': btns})
def _get_model_from_node(self, node, attr):
"""
Helper to look up a model from a <object model=...> or a <field
rel=... to=...> node.
"""
model_identifier = node.getAttribute(attr)
if not model_identifier:
raise base.DeserializationError(
"<%s> node is missing the required '%s' attribute"
% (node.nodeName, attr))
try:
return apps.get_model(model_identifier)
except (LookupError, TypeError):
raise base.DeserializationError(
"<%s> node has invalid model identifier: '%s'"
% (node.nodeName, model_identifier))
def parse_apps_and_model_labels(labels):
"""
Parse a list of "app_label.ModelName" or "app_label" strings into actual
objects and return a two-element tuple:
(set of model classes, set of app_configs).
Raise a CommandError if some specified models or apps don't exist.
"""
apps = set()
models = set()
for label in labels:
if '.' in label:
try:
model = installed_apps.get_model(label)
except LookupError:
raise CommandError('Unknown model: %s' % label)
models.add(model)
else:
try:
app_config = installed_apps.get_app_config(label)
except LookupError as e:
raise CommandError(str(e))
apps.add(app_config)
return models, apps
def _get_sitemap_full_url(sitemap_url):
if not django_apps.is_installed('django.contrib.sites'):
raise ImproperlyConfigured("ping_google requires django.contrib.sites, which isn't installed.")
if sitemap_url is None:
try:
# First, try to get the "index" sitemap URL.
sitemap_url = reverse('django.contrib.sitemaps.views.index')
except NoReverseMatch:
try:
# Next, try for the "global" sitemap URL.
sitemap_url = reverse('django.contrib.sitemaps.views.sitemap')
except NoReverseMatch:
pass
if sitemap_url is None:
raise SitemapNotFound("You didn't provide a sitemap_url, and the sitemap URL couldn't be auto-detected.")
Site = django_apps.get_model('sites.Site')
current_site = Site.objects.get_current()
return 'http://%s%s' % (current_site.domain, sitemap_url)
def context(self, context):
btns = []
for b in self.q_btns:
btn = {}
if 'model' in b:
model = self.get_model(b['model'])
if not self.user.has_perm("%s.view_%s" % (model._meta.app_label, model._meta.model_name)):
continue
btn['url'] = reverse("%s:%s_%s_%s" % (self.admin_site.app_name, model._meta.app_label,
model._meta.model_name, b.get('view', 'changelist')))
btn['title'] = model._meta.verbose_name
btn['icon'] = self.dashboard.get_model_icon(model)
else:
try:
btn['url'] = reverse(b['url'])
except NoReverseMatch:
btn['url'] = b['url']
if 'title' in b:
btn['title'] = b['title']
if 'icon' in b:
btn['icon'] = b['icon']
btns.append(btn)
context.update({'btns': btns})
def context(self, context):
btns = []
for b in self.q_btns:
btn = {}
if 'model' in b:
model = self.get_model(b['model'])
if not self.user.has_perm("%s.view_%s" % (model._meta.app_label, model._meta.model_name)):
continue
btn['url'] = reverse("%s:%s_%s_%s" % (self.admin_site.app_name, model._meta.app_label,
model._meta.model_name, b.get('view', 'changelist')))
btn['title'] = model._meta.verbose_name
btn['icon'] = self.dashboard.get_model_icon(model)
else:
try:
btn['url'] = reverse(b['url'])
except NoReverseMatch:
btn['url'] = b['url']
if 'title' in b:
btn['title'] = b['title']
if 'icon' in b:
btn['icon'] = b['icon']
btns.append(btn)
context.update({'btns': btns})
def context(self, context):
btns = []
for b in self.q_btns:
btn = {}
if 'model' in b:
model = self.get_model(b['model'])
if not self.user.has_perm("%s.view_%s" % (model._meta.app_label, model._meta.model_name)):
continue
btn['url'] = reverse("%s:%s_%s_%s" % (self.admin_site.app_name, model._meta.app_label,
model._meta.model_name, b.get('view', 'changelist')))
btn['title'] = model._meta.verbose_name
btn['icon'] = self.dashboard.get_model_icon(model)
else:
try:
btn['url'] = reverse(b['url'])
except NoReverseMatch:
btn['url'] = b['url']
if 'title' in b:
btn['title'] = b['title']
if 'icon' in b:
btn['icon'] = b['icon']
btns.append(btn)
context.update({'btns': btns})
def _get_model_from_node(self, node, attr):
"""
Helper to look up a model from a <object model=...> or a <field
rel=... to=...> node.
"""
model_identifier = node.getAttribute(attr)
if not model_identifier:
raise base.DeserializationError(
"<%s> node is missing the required '%s' attribute"
% (node.nodeName, attr))
try:
return apps.get_model(model_identifier)
except (LookupError, TypeError):
raise base.DeserializationError(
"<%s> node has invalid model identifier: '%s'"
% (node.nodeName, model_identifier))
def parse_apps_and_model_labels(labels):
"""
Parse a list of "app_label.ModelName" or "app_label" strings into actual
objects and return a two-element tuple:
(set of model classes, set of app_configs).
Raise a CommandError if some specified models or apps don't exist.
"""
apps = set()
models = set()
for label in labels:
if '.' in label:
try:
model = installed_apps.get_model(label)
except LookupError:
raise CommandError('Unknown model: %s' % label)
models.add(model)
else:
try:
app_config = installed_apps.get_app_config(label)
except LookupError as e:
raise CommandError(str(e))
apps.add(app_config)
return models, apps
def get_user_model():
"""
Returns the User model that is active in this project.
"""
try:
return django_apps.get_model(settings.AUTH_USER_MODEL, require_ready=False)
except ValueError:
raise ImproperlyConfigured("AUTH_USER_MODEL must be of the form 'app_label.model_name'")
except LookupError:
raise ImproperlyConfigured(
"AUTH_USER_MODEL refers to model '%s' that has not been installed" % settings.AUTH_USER_MODEL
)
def _get_sitemap_full_url(sitemap_url):
if not django_apps.is_installed('django.contrib.sites'):
raise ImproperlyConfigured("ping_google requires django.contrib.sites, which isn't installed.")
if sitemap_url is None:
try:
# First, try to get the "index" sitemap URL.
sitemap_url = reverse('django.contrib.sitemaps.views.index')
except NoReverseMatch:
try:
# Next, try for the "global" sitemap URL.
sitemap_url = reverse('django.contrib.sitemaps.views.sitemap')
except NoReverseMatch:
pass
if sitemap_url is None:
raise SitemapNotFound("You didn't provide a sitemap_url, and the sitemap URL couldn't be auto-detected.")
Site = django_apps.get_model('sites.Site')
current_site = Site.objects.get_current()
return 'http://%s%s' % (current_site.domain, sitemap_url)
def context(self, context):
btns = []
for b in self.q_btns:
btn = {}
if 'model' in b:
model = self.get_model(b['model'])
if not self.user.has_perm("%s.view_%s" % (model._meta.app_label, model._meta.model_name)):
continue
btn['url'] = reverse("%s:%s_%s_%s" % (self.admin_site.app_name, model._meta.app_label,
model._meta.model_name, b.get('view', 'changelist')))
btn['title'] = model._meta.verbose_name
btn['icon'] = self.dashboard.get_model_icon(model)
else:
try:
btn['url'] = reverse(b['url'])
except NoReverseMatch:
btn['url'] = b['url']
if 'title' in b:
btn['title'] = b['title']
if 'icon' in b:
btn['icon'] = b['icon']
btns.append(btn)
context.update({'btns': btns})
def _resolve_model(obj):
"""
Resolve supplied `obj` to a Django model class.
`obj` must be a Django model class itself, or a string
representation of one. Useful in situations like GH #1225 where
Django may not have resolved a string-based reference to a model in
another model's foreign key definition.
String representations should have the format:
'appname.ModelName'
"""
if isinstance(obj, six.string_types) and len(obj.split('.')) == 2:
app_name, model_name = obj.split('.')
resolved_model = apps.get_model(app_name, model_name)
if resolved_model is None:
msg = "Django did not return a model for {0}.{1}"
raise ImproperlyConfigured(msg.format(app_name, model_name))
return resolved_model
elif inspect.isclass(obj) and issubclass(obj, models.Model):
return obj
raise ValueError("{0} is not a Django model".format(obj))
def email_share(self, user, email, type, id):
# Get our e-mail formatter
formatter = self.get_email_formatter(type)
instance = None
# Attempt to get the object instance we want to share
try:
instance = apps.get_model(self.AVAILABLE_TYPES[type]).objects.get(id=id)
except ObjectDoesNotExist:
# If it doesn't exist, respond with a 404
return Response({"message": f"Object with id {id} does not exist"}, status=status.HTTP_404_NOT_FOUND)
# Create our e-mail body
email_body = {
"to": email,
"subject": f"[TalentMAP] Shared {type}",
"body": formatter(instance)
}
# TODO: Implement actual e-mail sending here when avaiable e-mail servers are clarified
# Return a 202 ACCEPTED with a copy of the email body
return Response({"message": f"Position shared externally via email at {email}", "email_body": email_body}, status=status.HTTP_202_ACCEPTED)
def internal_share(self, user, email, type, id):
receiving_user = None
# Attempt to get the object instance we want to share
try:
apps.get_model(self.AVAILABLE_TYPES[type]).objects.get(id=id)
except ObjectDoesNotExist:
# If it doesn't exist, respond with a 404
return Response({"message": f"Object with id {id} does not exist"}, status=status.HTTP_404_NOT_FOUND)
# Attempt to get the receiving user by e-mail address
try:
receiving_user = UserProfile.objects.get(user__email=email)
except ObjectDoesNotExist:
return Response({"message": f"User with email {email} does not exist"}, status=status.HTTP_404_NOT_FOUND)
# Create our sharable object using the source user, receiving user, id, and model
# This will auto-populate in the receiving user's received shares on their profile
Sharable.objects.create(sharing_user=user.profile,
receiving_user=receiving_user,
sharable_id=id,
sharable_model=self.AVAILABLE_TYPES[type])
return Response({"message": f"Position shared internally to user with email {email}"}, status=status.HTTP_202_ACCEPTED)
def test_endpoints_list(authorized_client, authorized_user, endpoint, model, recipe, retrievable):
number = random.randint(5, 10)
# Create a random amount of objects from the recipe, if it is given
if recipe:
if callable(recipe):
for i in range(0, number):
recipe()
else:
mommy.make_recipe(recipe, _quantity=number)
elif model:
mommy.make(model, _quantity=number)
response = authorized_client.get(endpoint)
assert response.status_code == status.HTTP_200_OK
assert len(response.data["results"]) == apps.get_model(model).objects.count()
assert len(response.data["results"]) == number
def test_endpoints_retrieve(authorized_client, authorized_user, endpoint, model, recipe, retrievable):
# Skip any endpoints that don't support "retrieve" actions
if not retrievable:
return
number = random.randint(5, 10)
# Create a random amount of objects from the recipe, if it is given
if recipe:
if callable(recipe):
for i in range(0, number):
recipe()
else:
mommy.make_recipe(recipe, _quantity=number)
elif model:
mommy.make(model, _quantity=number)
# Check that each item is retrievable
for id in apps.get_model(model).objects.values_list('id', flat=True):
response = authorized_client.get(f"{endpoint}{id}/")
assert response.status_code == status.HTTP_200_OK
renamefieldhistory.py 文件源码
项目:django-field-history
作者: grantmcconnaughey
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def handle(self, *args, **options):
model_name = options.get('model')
from_field = options.get('from_field')
to_field = options.get('to_field')
if not model_name:
raise CommandError('--model_name is a required argument')
if not from_field:
raise CommandError('--from_field is a required argument')
if not to_field:
raise CommandError('--to_field is a required argument')
model = apps.get_model(model_name)
content_type = ContentType.objects.get_for_model(model)
field_histories = FieldHistory.objects.filter(content_type=content_type, field_name=from_field)
self.stdout.write('Updating {} FieldHistory object(s)\n'.format(field_histories.count()))
field_histories.update(field_name=to_field)
def _resolve_model(obj):
"""
Resolve supplied `obj` to a Django model class.
`obj` must be a Django model class itself, or a string
representation of one. Useful in situations like GH #1225 where
Django may not have resolved a string-based reference to a model in
another model's foreign key definition.
String representations should have the format:
'appname.ModelName'
"""
if isinstance(obj, six.string_types) and len(obj.split('.')) == 2:
app_name, model_name = obj.split('.')
resolved_model = apps.get_model(app_name, model_name)
if resolved_model is None:
msg = "Django did not return a model for {0}.{1}"
raise ImproperlyConfigured(msg.format(app_name, model_name))
return resolved_model
elif inspect.isclass(obj) and issubclass(obj, models.Model):
return obj
raise ValueError("{0} is not a Django model".format(obj))
def _get_model_from_node(self, node, attr):
"""
Helper to look up a model from a <object model=...> or a <field
rel=... to=...> node.
"""
model_identifier = node.getAttribute(attr)
if not model_identifier:
raise base.DeserializationError(
"<%s> node is missing the required '%s' attribute"
% (node.nodeName, attr))
try:
return apps.get_model(model_identifier)
except (LookupError, TypeError):
raise base.DeserializationError(
"<%s> node has invalid model identifier: '%s'"
% (node.nodeName, model_identifier))
def _rename(self, apps, schema_editor, old_model, new_model):
ContentType = apps.get_model('contenttypes', 'ContentType')
db = schema_editor.connection.alias
if not router.allow_migrate_model(db, ContentType):
return
try:
content_type = ContentType.objects.db_manager(db).get_by_natural_key(self.app_label, old_model)
except ContentType.DoesNotExist:
pass
else:
content_type.model = new_model
try:
with transaction.atomic(using=db):
content_type.save(update_fields={'model'})
except IntegrityError:
# Gracefully fallback if a stale content type causes a
# conflict as update_contenttypes will take care of asking the
# user what should be done next.
content_type.model = old_model
else:
# Clear the cache as the `get_by_natual_key()` call will cache
# the renamed ContentType instance by its old model name.
ContentType.objects.clear_cache()
def _get_model_from_node(self, node, attr):
"""
Helper to look up a model from a <object model=...> or a <field
rel=... to=...> node.
"""
model_identifier = node.getAttribute(attr)
if not model_identifier:
raise base.DeserializationError(
"<%s> node is missing the required '%s' attribute"
% (node.nodeName, attr))
try:
return apps.get_model(model_identifier)
except (LookupError, TypeError):
raise base.DeserializationError(
"<%s> node has invalid model identifier: '%s'"
% (node.nodeName, model_identifier))
def handle(self, **options):
warnings.warn("The syncdb command will be removed in Django 1.9", RemovedInDjango19Warning)
call_command("migrate", **options)
try:
apps.get_model('auth', 'Permission')
except LookupError:
return
UserModel = get_user_model()
if not UserModel._default_manager.exists() and options.get('interactive'):
msg = ("\nYou have installed Django's auth system, and "
"don't have any superusers defined.\nWould you like to create one "
"now? (yes/no): ")
confirm = input(msg)
while 1:
if confirm not in ('yes', 'no'):
confirm = input('Please enter either "yes" or "no": ')
continue
if confirm == 'yes':
call_command("createsuperuser", interactive=True, database=options['database'])
break