Permalink
Join GitHub today
GitHub is home to over 28 million developers working together to host and review code, manage projects, and build software together.
Sign upimport copy | |
import inspect | |
import warnings | |
from functools import partialmethod | |
from itertools import chain | |
from django.apps import apps | |
from django.conf import settings | |
from django.core import checks | |
from django.core.exceptions import ( | |
NON_FIELD_ERRORS, FieldDoesNotExist, FieldError, MultipleObjectsReturned, | |
ObjectDoesNotExist, ValidationError, | |
) | |
from django.db import ( | |
DEFAULT_DB_ALIAS, DJANGO_VERSION_PICKLE_KEY, DatabaseError, connection, | |
connections, router, transaction, | |
) | |
from django.db.models.constants import LOOKUP_SEP | |
from django.db.models.constraints import CheckConstraint | |
from django.db.models.deletion import CASCADE, Collector | |
from django.db.models.fields.related import ( | |
ForeignObjectRel, OneToOneField, lazy_related_operation, resolve_relation, | |
) | |
from django.db.models.manager import Manager | |
from django.db.models.options import Options | |
from django.db.models.query import Q | |
from django.db.models.signals import ( | |
class_prepared, post_init, post_save, pre_init, pre_save, | |
) | |
from django.db.models.utils import make_model_tuple | |
from django.utils.text import capfirst, get_text_list | |
from django.utils.translation import gettext_lazy as _ | |
from django.utils.version import get_version | |
class Deferred: | |
def __repr__(self): | |
return '<Deferred field>' | |
def __str__(self): | |
return '<Deferred field>' | |
DEFERRED = Deferred() | |
def subclass_exception(name, bases, module, attached_to): | |
""" | |
Create exception subclass. Used by ModelBase below. | |
The exception is created in a way that allows it to be pickled, assuming | |
that the returned exception class will be added as an attribute to the | |
'attached_to' class. | |
""" | |
return type(name, bases, { | |
'__module__': module, | |
'__qualname__': '%s.%s' % (attached_to.__qualname__, name), | |
}) | |
class ModelBase(type): | |
"""Metaclass for all models.""" | |
def __new__(cls, name, bases, attrs, **kwargs): | |
super_new = super().__new__ | |
# Also ensure initialization is only performed for subclasses of Model | |
# (excluding Model class itself). | |
parents = [b for b in bases if isinstance(b, ModelBase)] | |
if not parents: | |
return super_new(cls, name, bases, attrs) | |
# Create the class. | |
module = attrs.pop('__module__') | |
new_attrs = {'__module__': module} | |
classcell = attrs.pop('__classcell__', None) | |
if classcell is not None: | |
new_attrs['__classcell__'] = classcell | |
new_class = super_new(cls, name, bases, new_attrs, **kwargs) | |
attr_meta = attrs.pop('Meta', None) | |
abstract = getattr(attr_meta, 'abstract', False) | |
meta = attr_meta or getattr(new_class, 'Meta', None) | |
base_meta = getattr(new_class, '_meta', None) | |
app_label = None | |
# Look for an application configuration to attach the model to. | |
app_config = apps.get_containing_app_config(module) | |
if getattr(meta, 'app_label', None) is None: | |
if app_config is None: | |
if not abstract: | |
raise RuntimeError( | |
"Model class %s.%s doesn't declare an explicit " | |
"app_label and isn't in an application in " | |
"INSTALLED_APPS." % (module, name) | |
) | |
else: | |
app_label = app_config.label | |
new_class.add_to_class('_meta', Options(meta, app_label)) | |
if not abstract: | |
new_class.add_to_class( | |
'DoesNotExist', | |
subclass_exception( | |
'DoesNotExist', | |
tuple( | |
x.DoesNotExist for x in parents if hasattr(x, '_meta') and not x._meta.abstract | |
) or (ObjectDoesNotExist,), | |
module, | |
attached_to=new_class)) | |
new_class.add_to_class( | |
'MultipleObjectsReturned', | |
subclass_exception( | |
'MultipleObjectsReturned', | |
tuple( | |
x.MultipleObjectsReturned for x in parents if hasattr(x, '_meta') and not x._meta.abstract | |
) or (MultipleObjectsReturned,), | |
module, | |
attached_to=new_class)) | |
if base_meta and not base_meta.abstract: | |
# Non-abstract child classes inherit some attributes from their | |
# non-abstract parent (unless an ABC comes before it in the | |
# method resolution order). | |
if not hasattr(meta, 'ordering'): | |
new_class._meta.ordering = base_meta.ordering | |
if not hasattr(meta, 'get_latest_by'): | |
new_class._meta.get_latest_by = base_meta.get_latest_by | |
is_proxy = new_class._meta.proxy | |
# If the model is a proxy, ensure that the base class | |
# hasn't been swapped out. | |
if is_proxy and base_meta and base_meta.swapped: | |
raise TypeError("%s cannot proxy the swapped model '%s'." % (name, base_meta.swapped)) | |
# Add all attributes to the class. | |
for obj_name, obj in attrs.items(): | |
new_class.add_to_class(obj_name, obj) | |
# All the fields of any type declared on this model | |
new_fields = chain( | |
new_class._meta.local_fields, | |
new_class._meta.local_many_to_many, | |
new_class._meta.private_fields | |
) | |
field_names = {f.name for f in new_fields} | |
# Basic setup for proxy models. | |
if is_proxy: | |
base = None | |
for parent in [kls for kls in parents if hasattr(kls, '_meta')]: | |
if parent._meta.abstract: | |
if parent._meta.fields: | |
raise TypeError( | |
"Abstract base class containing model fields not " | |
"permitted for proxy model '%s'." % name | |
) | |
else: | |
continue | |
if base is None: | |
base = parent | |
elif parent._meta.concrete_model is not base._meta.concrete_model: | |
raise TypeError("Proxy model '%s' has more than one non-abstract model base class." % name) | |
if base is None: | |
raise TypeError("Proxy model '%s' has no non-abstract model base class." % name) | |
new_class._meta.setup_proxy(base) | |
new_class._meta.concrete_model = base._meta.concrete_model | |
else: | |
new_class._meta.concrete_model = new_class | |
# Collect the parent links for multi-table inheritance. | |
parent_links = {} | |
for base in reversed([new_class] + parents): | |
# Conceptually equivalent to `if base is Model`. | |
if not hasattr(base, '_meta'): | |
continue | |
# Skip concrete parent classes. | |
if base != new_class and not base._meta.abstract: | |
continue | |
# Locate OneToOneField instances. | |
for field in base._meta.local_fields: | |
if isinstance(field, OneToOneField): | |
related = resolve_relation(new_class, field.remote_field.model) | |
parent_links[make_model_tuple(related)] = field | |
# Track fields inherited from base models. | |
inherited_attributes = set() | |
# Do the appropriate setup for any model parents. | |
for base in new_class.mro(): | |
if base not in parents or not hasattr(base, '_meta'): | |
# Things without _meta aren't functional models, so they're | |
# uninteresting parents. | |
inherited_attributes.update(base.__dict__) | |
continue | |
parent_fields = base._meta.local_fields + base._meta.local_many_to_many | |
if not base._meta.abstract: | |
# Check for clashes between locally declared fields and those | |
# on the base classes. | |
for field in parent_fields: | |
if field.name in field_names: | |
raise FieldError( | |
'Local field %r in class %r clashes with field of ' | |
'the same name from base class %r.' % ( | |
field.name, | |
name, | |
base.__name__, | |
) | |
) | |
else: | |
inherited_attributes.add(field.name) | |
# Concrete classes... | |
base = base._meta.concrete_model | |
base_key = make_model_tuple(base) | |
if base_key in parent_links: | |
field = parent_links[base_key] | |
elif not is_proxy: | |
attr_name = '%s_ptr' % base._meta.model_name | |
field = OneToOneField( | |
base, | |
on_delete=CASCADE, | |
name=attr_name, | |
auto_created=True, | |
parent_link=True, | |
) | |
if attr_name in field_names: | |
raise FieldError( | |
"Auto-generated field '%s' in class %r for " | |
"parent_link to base class %r clashes with " | |
"declared field of the same name." % ( | |
attr_name, | |
name, | |
base.__name__, | |
) | |
) | |
# Only add the ptr field if it's not already present; | |
# e.g. migrations will already have it specified | |
if not hasattr(new_class, attr_name): | |
new_class.add_to_class(attr_name, field) | |
else: | |
field = None | |
new_class._meta.parents[base] = field | |
else: | |
base_parents = base._meta.parents.copy() | |
# Add fields from abstract base class if it wasn't overridden. | |
for field in parent_fields: | |
if (field.name not in field_names and | |
field.name not in new_class.__dict__ and | |
field.name not in inherited_attributes): | |
new_field = copy.deepcopy(field) | |
new_class.add_to_class(field.name, new_field) | |
# Replace parent links defined on this base by the new | |
# field. It will be appropriately resolved if required. | |
if field.one_to_one: | |
for parent, parent_link in base_parents.items(): | |
if field == parent_link: | |
base_parents[parent] = new_field | |
# Pass any non-abstract parent classes onto child. | |
new_class._meta.parents.update(base_parents) | |
# Inherit private fields (like GenericForeignKey) from the parent | |
# class | |
for field in base._meta.private_fields: | |
if field.name in field_names: | |
if not base._meta.abstract: | |
raise FieldError( | |
'Local field %r in class %r clashes with field of ' | |
'the same name from base class %r.' % ( | |
field.name, | |
name, | |
base.__name__, | |
) | |
) | |
else: | |
field = copy.deepcopy(field) | |
if not base._meta.abstract: | |
field.mti_inherited = True | |
new_class.add_to_class(field.name, field) | |
# Copy indexes so that index names are unique when models extend an | |
# abstract model. | |
new_class._meta.indexes = [copy.deepcopy(idx) for idx in new_class._meta.indexes] | |
if abstract: | |
# Abstract base models can't be instantiated and don't appear in | |
# the list of models for an app. We do the final setup for them a | |
# little differently from normal models. | |
attr_meta.abstract = False | |
new_class.Meta = attr_meta | |
return new_class | |
new_class._prepare() | |
new_class._meta.apps.register_model(new_class._meta.app_label, new_class) | |
return new_class | |
def add_to_class(cls, name, value): | |
# We should call the contribute_to_class method only if it's bound | |
if not inspect.isclass(value) and hasattr(value, 'contribute_to_class'): | |
value.contribute_to_class(cls, name) | |
else: | |
setattr(cls, name, value) | |
def _prepare(cls): | |
"""Create some methods once self._meta has been populated.""" | |
opts = cls._meta | |
opts._prepare(cls) | |
if opts.order_with_respect_to: | |
cls.get_next_in_order = partialmethod(cls._get_next_or_previous_in_order, is_next=True) | |
cls.get_previous_in_order = partialmethod(cls._get_next_or_previous_in_order, is_next=False) | |
# Defer creating accessors on the foreign class until it has been | |
# created and registered. If remote_field is None, we're ordering | |
# with respect to a GenericForeignKey and don't know what the | |
# foreign class is - we'll add those accessors later in | |
# contribute_to_class(). | |
if opts.order_with_respect_to.remote_field: | |
wrt = opts.order_with_respect_to | |
remote = wrt.remote_field.model | |
lazy_related_operation(make_foreign_order_accessors, cls, remote) | |
# Give the class a docstring -- its definition. | |
if cls.__doc__ is None: | |
cls.__doc__ = "%s(%s)" % (cls.__name__, ", ".join(f.name for f in opts.fields)) | |
get_absolute_url_override = settings.ABSOLUTE_URL_OVERRIDES.get(opts.label_lower) | |
if get_absolute_url_override: | |
setattr(cls, 'get_absolute_url', get_absolute_url_override) | |
if not opts.managers: | |
if any(f.name == 'objects' for f in opts.fields): | |
raise ValueError( | |
"Model %s must specify a custom Manager, because it has a " | |
"field named 'objects'." % cls.__name__ | |
) | |
manager = Manager() | |
manager.auto_created = True | |
cls.add_to_class('objects', manager) | |
# Set the name of _meta.indexes. This can't be done in | |
# Options.contribute_to_class() because fields haven't been added to | |
# the model at that point. | |
for index in cls._meta.indexes: | |
if not index.name: | |
index.set_name_with_model(cls) | |
class_prepared.send(sender=cls) | |
@property | |
def _base_manager(cls): | |
return cls._meta.base_manager | |
@property | |
def _default_manager(cls): | |
return cls._meta.default_manager | |
class ModelStateFieldsCacheDescriptor: | |
def __get__(self, instance, cls=None): | |
if instance is None: | |
return self | |
res = instance.fields_cache = {} | |
return res | |
class ModelState: | |
"""Store model instance state.""" | |
db = None | |
# If true, uniqueness validation checks will consider this a new, unsaved | |
# object. Necessary for correct validation of new instances of objects with | |
# explicit (non-auto) PKs. This impacts validation only; it has no effect | |
# on the actual save. | |
adding = True | |
fields_cache = ModelStateFieldsCacheDescriptor() | |
class Model(metaclass=ModelBase): | |
def __init__(self, *args, **kwargs): | |
# Alias some things as locals to avoid repeat global lookups | |
cls = self.__class__ | |
opts = self._meta | |
_setattr = setattr | |
_DEFERRED = DEFERRED | |
pre_init.send(sender=cls, args=args, kwargs=kwargs) | |
# Set up the storage for instance state | |
self._state = ModelState() | |
# There is a rather weird disparity here; if kwargs, it's set, then args | |
# overrides it. It should be one or the other; don't duplicate the work | |
# The reason for the kwargs check is that standard iterator passes in by | |
# args, and instantiation for iteration is 33% faster. | |
if len(args) > len(opts.concrete_fields): | |
# Daft, but matches old exception sans the err msg. | |
raise IndexError("Number of args exceeds number of fields") | |
if not kwargs: | |
fields_iter = iter(opts.concrete_fields) | |
# The ordering of the zip calls matter - zip throws StopIteration | |
# when an iter throws it. So if the first iter throws it, the second | |
# is *not* consumed. We rely on this, so don't change the order | |
# without changing the logic. | |
for val, field in zip(args, fields_iter): | |
if val is _DEFERRED: | |
continue | |
_setattr(self, field.attname, val) | |
else: | |
# Slower, kwargs-ready version. | |
fields_iter = iter(opts.fields) | |
for val, field in zip(args, fields_iter): | |
if val is _DEFERRED: | |
continue | |
_setattr(self, field.attname, val) | |
kwargs.pop(field.name, None) | |
# Now we're left with the unprocessed fields that *must* come from | |
# keywords, or default. | |
for field in fields_iter: | |
is_related_object = False | |
# Virtual field | |
if field.attname not in kwargs and field.column is None: | |
continue | |
if kwargs: | |
if isinstance(field.remote_field, ForeignObjectRel): | |
try: | |
# Assume object instance was passed in. | |
rel_obj = kwargs.pop(field.name) | |
is_related_object = True | |
except KeyError: | |
try: | |
# Object instance wasn't passed in -- must be an ID. | |
val = kwargs.pop(field.attname) | |
except KeyError: | |
val = field.get_default() | |
else: | |
# Object instance was passed in. Special case: You can | |
# pass in "None" for related objects if it's allowed. | |
if rel_obj is None and field.null: | |
val = None | |
else: | |
try: | |
val = kwargs.pop(field.attname) | |
except KeyError: | |
# This is done with an exception rather than the | |
# default argument on pop because we don't want | |
# get_default() to be evaluated, and then not used. | |
# Refs #12057. | |
val = field.get_default() | |
else: | |
val = field.get_default() | |
if is_related_object: | |
# If we are passed a related instance, set it using the | |
# field.name instead of field.attname (e.g. "user" instead of | |
# "user_id") so that the object gets properly cached (and type | |
# checked) by the RelatedObjectDescriptor. | |
if rel_obj is not _DEFERRED: | |
_setattr(self, field.name, rel_obj) | |
else: | |
if val is not _DEFERRED: | |
_setattr(self, field.attname, val) | |
if kwargs: | |
property_names = opts._property_names | |
for prop in tuple(kwargs): | |
try: | |
# Any remaining kwargs must correspond to properties or | |
# virtual fields. | |
if prop in property_names or opts.get_field(prop): | |
if kwargs[prop] is not _DEFERRED: | |
_setattr(self, prop, kwargs[prop]) | |
del kwargs[prop] | |
except (AttributeError, FieldDoesNotExist): | |
pass | |
for kwarg in kwargs: | |
raise TypeError("%s() got an unexpected keyword argument '%s'" % (cls.__name__, kwarg)) | |
super().__init__() | |
post_init.send(sender=cls, instance=self) | |
@classmethod | |
def from_db(cls, db, field_names, values): | |
if len(values) != len(cls._meta.concrete_fields): | |
values_iter = iter(values) | |
values = [ | |
next(values_iter) if f.attname in field_names else DEFERRED | |
for f in cls._meta.concrete_fields | |
] | |
new = cls(*values) | |
new._state.adding = False | |
new._state.db = db | |
return new | |
def __repr__(self): | |
return '<%s: %s>' % (self.__class__.__name__, self) | |
def __str__(self): | |
return '%s object (%s)' % (self.__class__.__name__, self.pk) | |
def __eq__(self, other): | |
if not isinstance(other, Model): | |
return False | |
if self._meta.concrete_model != other._meta.concrete_model: | |
return False | |
my_pk = self.pk | |
if my_pk is None: | |
return self is other | |
return my_pk == other.pk | |
def __hash__(self): | |
if self.pk is None: | |
raise TypeError("Model instances without primary key value are unhashable") | |
return hash(self.pk) | |
def __reduce__(self): | |
data = self.__getstate__() | |
data[DJANGO_VERSION_PICKLE_KEY] = get_version() | |
class_id = self._meta.app_label, self._meta.object_name | |
return model_unpickle, (class_id,), data | |
def __getstate__(self): | |
"""Hook to allow choosing the attributes to pickle.""" | |
return self.__dict__ | |
def __setstate__(self, state): | |
msg = None | |
pickled_version = state.get(DJANGO_VERSION_PICKLE_KEY) | |
if pickled_version: | |
current_version = get_version() | |
if current_version != pickled_version: | |
msg = ( | |
"Pickled model instance's Django version %s does not match " | |
"the current version %s." % (pickled_version, current_version) | |
) | |
else: | |
msg = "Pickled model instance's Django version is not specified." | |
if msg: | |
warnings.warn(msg, RuntimeWarning, stacklevel=2) | |
self.__dict__.update(state) | |
def _get_pk_val(self, meta=None): | |
meta = meta or self._meta | |
return getattr(self, meta.pk.attname) | |
def _set_pk_val(self, value): | |
return setattr(self, self._meta.pk.attname, value) | |
pk = property(_get_pk_val, _set_pk_val) | |
def get_deferred_fields(self): | |
""" | |
Return a set containing names of deferred fields for this instance. | |
""" | |
return { | |
f.attname for f in self._meta.concrete_fields | |
if f.attname not in self.__dict__ | |
} | |
def refresh_from_db(self, using=None, fields=None): | |
""" | |
Reload field values from the database. | |
By default, the reloading happens from the database this instance was | |
loaded from, or by the read router if this instance wasn't loaded from | |
any database. The using parameter will override the default. | |
Fields can be used to specify which fields to reload. The fields | |
should be an iterable of field attnames. If fields is None, then | |
all non-deferred fields are reloaded. | |
When accessing deferred fields of an instance, the deferred loading | |
of the field will call this method. | |
""" | |
if fields is None: | |
self._prefetched_objects_cache = {} | |
else: | |
prefetched_objects_cache = getattr(self, '_prefetched_objects_cache', ()) | |
for field in fields: | |
if field in prefetched_objects_cache: | |
del prefetched_objects_cache[field] | |
fields.remove(field) | |
if not fields: | |
return | |
if any(LOOKUP_SEP in f for f in fields): | |
raise ValueError( | |
'Found "%s" in fields argument. Relations and transforms ' | |
'are not allowed in fields.' % LOOKUP_SEP) | |
hints = {'instance': self} | |
db_instance_qs = self.__class__._base_manager.db_manager(using, hints=hints).filter(pk=self.pk) | |
# Use provided fields, if not set then reload all non-deferred fields. | |
deferred_fields = self.get_deferred_fields() | |
if fields is not None: | |
fields = list(fields) | |
db_instance_qs = db_instance_qs.only(*fields) | |
elif deferred_fields: | |
fields = [f.attname for f in self._meta.concrete_fields | |
if f.attname not in deferred_fields] | |
db_instance_qs = db_instance_qs.only(*fields) | |
db_instance = db_instance_qs.get() | |
non_loaded_fields = db_instance.get_deferred_fields() | |
for field in self._meta.concrete_fields: | |
if field.attname in non_loaded_fields: | |
# This field wasn't refreshed - skip ahead. | |
continue | |
setattr(self, field.attname, getattr(db_instance, field.attname)) | |
# Clear cached foreign keys. | |
if field.is_relation and field.is_cached(self): | |
field.delete_cached_value(self) | |
# Clear cached relations. | |
for field in self._meta.related_objects: | |
if field.is_cached(self): | |
field.delete_cached_value(self) | |
self._state.db = db_instance._state.db | |
def serializable_value(self, field_name): | |
""" | |
Return the value of the field name for this instance. If the field is | |
a foreign key, return the id value instead of the object. If there's | |
no Field object with this name on the model, return the model | |
attribute's value. | |
Used to serialize a field's value (in the serializer, or form output, | |
for example). Normally, you would just access the attribute directly | |
and not use this method. | |
""" | |
try: | |
field = self._meta.get_field(field_name) | |
except FieldDoesNotExist: | |
return getattr(self, field_name) | |
return getattr(self, field.attname) | |
def save(self, force_insert=False, force_update=False, using=None, | |
update_fields=None): | |
""" | |
Save the current instance. Override this in a subclass if you want to | |
control the saving process. | |
The 'force_insert' and 'force_update' parameters can be used to insist | |
that the "save" must be an SQL insert or update (or equivalent for | |
non-SQL backends), respectively. Normally, they should not be set. | |
""" | |
# Ensure that a model instance without a PK hasn't been assigned to | |
# a ForeignKey or OneToOneField on this model. If the field is | |
# nullable, allowing the save() would result in silent data loss. | |
for field in self._meta.concrete_fields: | |
# If the related field isn't cached, then an instance hasn't | |
# been assigned and there's no need to worry about this check. | |
if field.is_relation and field.is_cached(self): | |
obj = getattr(self, field.name, None) | |
# A pk may have been assigned manually to a model instance not | |
# saved to the database (or auto-generated in a case like | |
# UUIDField), but we allow the save to proceed and rely on the | |
# database to raise an IntegrityError if applicable. If | |
# constraints aren't supported by the database, there's the | |
# unavoidable risk of data corruption. | |
if obj and obj.pk is None: | |
# Remove the object from a related instance cache. | |
if not field.remote_field.multiple: | |
field.remote_field.delete_cached_value(obj) | |
raise ValueError( | |
"save() prohibited to prevent data loss due to " | |
"unsaved related object '%s'." % field.name | |
) | |
# If the relationship's pk was changed, clear the cached | |
# relationship. | |
if obj and obj.pk != getattr(self, field.attname): | |
field.delete_cached_value(self) | |
using = using or router.db_for_write(self.__class__, instance=self) | |
if force_insert and (force_update or update_fields): | |
raise ValueError("Cannot force both insert and updating in model saving.") | |
deferred_fields = self.get_deferred_fields() | |
if update_fields is not None: | |
# If update_fields is empty, skip the save. We do also check for | |
# no-op saves later on for inheritance cases. This bailout is | |
# still needed for skipping signal sending. | |
if not update_fields: | |
return | |
update_fields = frozenset(update_fields) | |
field_names = set() | |
for field in self._meta.fields: | |
if not field.primary_key: | |
field_names.add(field.name) | |
if field.name != field.attname: | |
field_names.add(field.attname) | |
non_model_fields = update_fields.difference(field_names) | |
if non_model_fields: | |
raise ValueError("The following fields do not exist in this " | |
"model or are m2m fields: %s" | |
% ', '.join(non_model_fields)) | |
# If saving to the same database, and this model is deferred, then | |
# automatically do a "update_fields" save on the loaded fields. | |
elif not force_insert and deferred_fields and using == self._state.db: | |
field_names = set() | |
for field in self._meta.concrete_fields: | |
if not field.primary_key and not hasattr(field, 'through'): | |
field_names.add(field.attname) | |
loaded_fields = field_names.difference(deferred_fields) | |
if loaded_fields: | |
update_fields = frozenset(loaded_fields) | |
self.save_base(using=using, force_insert=force_insert, | |
force_update=force_update, update_fields=update_fields) | |
save.alters_data = True | |
def save_base(self, raw=False, force_insert=False, | |
force_update=False, using=None, update_fields=None): | |
""" | |
Handle the parts of saving which should be done only once per save, | |
yet need to be done in raw saves, too. This includes some sanity | |
checks and signal sending. | |
The 'raw' argument is telling save_base not to save any parent | |
models and not to do any changes to the values before save. This | |
is used by fixture loading. | |
""" | |
using = using or router.db_for_write(self.__class__, instance=self) | |
assert not (force_insert and (force_update or update_fields)) | |
assert update_fields is None or update_fields | |
cls = origin = self.__class__ | |
# Skip proxies, but keep the origin as the proxy model. | |
if cls._meta.proxy: | |
cls = cls._meta.concrete_model | |
meta = cls._meta | |
if not meta.auto_created: | |
pre_save.send( | |
sender=origin, instance=self, raw=raw, using=using, | |
update_fields=update_fields, | |
) | |
with transaction.atomic(using=using, savepoint=False): | |
parent_inserted = False | |
if not raw: | |
parent_inserted = self._save_parents(cls, using, update_fields) | |
updated = self._save_table( | |
raw, cls, force_insert or parent_inserted, | |
force_update, using, update_fields, | |
) | |
# Store the database on which the object was saved | |
self._state.db = using | |
# Once saved, this is no longer a to-be-added instance. | |
self._state.adding = False | |
# Signal that the save is complete | |
if not meta.auto_created: | |
post_save.send( | |
sender=origin, instance=self, created=(not updated), | |
update_fields=update_fields, raw=raw, using=using, | |
) | |
save_base.alters_data = True | |
def _save_parents(self, cls, using, update_fields): | |
"""Save all the parents of cls using values from self.""" | |
meta = cls._meta | |
inserted = False | |
for parent, field in meta.parents.items(): | |
# Make sure the link fields are synced between parent and self. | |
if (field and getattr(self, parent._meta.pk.attname) is None and | |
getattr(self, field.attname) is not None): | |
setattr(self, parent._meta.pk.attname, getattr(self, field.attname)) | |
parent_inserted = self._save_parents(cls=parent, using=using, update_fields=update_fields) | |
updated = self._save_table( | |
cls=parent, using=using, update_fields=update_fields, | |
force_insert=parent_inserted, | |
) | |
if not updated: | |
inserted = True | |
# Set the parent's PK value to self. | |
if field: | |
setattr(self, field.attname, self._get_pk_val(parent._meta)) | |
# Since we didn't have an instance of the parent handy set | |
# attname directly, bypassing the descriptor. Invalidate | |
# the related object cache, in case it's been accidentally | |
# populated. A fresh instance will be re-built from the | |
# database if necessary. | |
if field.is_cached(self): | |
field.delete_cached_value(self) | |
return inserted | |
def _save_table(self, raw=False, cls=None, force_insert=False, | |
force_update=False, using=None, update_fields=None): | |
""" | |
Do the heavy-lifting involved in saving. Update or insert the data | |
for a single table. | |
""" | |
meta = cls._meta | |
non_pks = [f for f in meta.local_concrete_fields if not f.primary_key] | |
if update_fields: | |
non_pks = [f for f in non_pks | |
if f.name in update_fields or f.attname in update_fields] | |
pk_val = self._get_pk_val(meta) | |
if pk_val is None: | |
pk_val = meta.pk.get_pk_value_on_save(self) | |
setattr(self, meta.pk.attname, pk_val) | |
pk_set = pk_val is not None | |
if not pk_set and (force_update or update_fields): | |
raise ValueError("Cannot force an update in save() with no primary key.") | |
updated = False | |
# If possible, try an UPDATE. If that doesn't update anything, do an INSERT. | |
if pk_set and not force_insert: | |
base_qs = cls._base_manager.using(using) | |
values = [(f, None, (getattr(self, f.attname) if raw else f.pre_save(self, False))) | |
for f in non_pks] | |
forced_update = update_fields or force_update | |
updated = self._do_update(base_qs, using, pk_val, values, update_fields, | |
forced_update) | |
if force_update and not updated: | |
raise DatabaseError("Forced update did not affect any rows.") | |
if update_fields and not updated: | |
raise DatabaseError("Save with update_fields did not affect any rows.") | |
if not updated: | |
if meta.order_with_respect_to: | |
# If this is a model with an order_with_respect_to | |
# autopopulate the _order field | |
field = meta.order_with_respect_to | |
filter_args = field.get_filter_kwargs_for_object(self) | |
order_value = cls._base_manager.using(using).filter(**filter_args).count() | |
self._order = order_value | |
fields = meta.local_concrete_fields | |
if not pk_set: | |
fields = [f for f in fields if f is not meta.auto_field] | |
update_pk = meta.auto_field and not pk_set | |
result = self._do_insert(cls._base_manager, using, fields, update_pk, raw) | |
if update_pk: | |
setattr(self, meta.pk.attname, result) | |
return updated | |
def _do_update(self, base_qs, using, pk_val, values, update_fields, forced_update): | |
""" | |
Try to update the model. Return True if the model was updated (if an | |
update query was done and a matching row was found in the DB). | |
""" | |
filtered = base_qs.filter(pk=pk_val) | |
if not values: | |
# We can end up here when saving a model in inheritance chain where | |
# update_fields doesn't target any field in current model. In that | |
# case we just say the update succeeded. Another case ending up here | |
# is a model with just PK - in that case check that the PK still | |
# exists. | |
return update_fields is not None or filtered.exists() | |
if self._meta.select_on_save and not forced_update: | |
return ( | |
filtered.exists() and | |
# It may happen that the object is deleted from the DB right after | |
# this check, causing the subsequent UPDATE to return zero matching | |
# rows. The same result can occur in some rare cases when the | |
# database returns zero despite the UPDATE being executed | |
# successfully (a row is matched and updated). In order to | |
# distinguish these two cases, the object's existence in the | |
# database is again checked for if the UPDATE query returns 0. | |
(filtered._update(values) > 0 or filtered.exists()) | |
) | |
return filtered._update(values) > 0 | |
def _do_insert(self, manager, using, fields, update_pk, raw): | |
""" | |
Do an INSERT. If update_pk is defined then this method should return | |
the new pk for the model. | |
""" | |
return manager._insert([self], fields=fields, return_id=update_pk, | |
using=using, raw=raw) | |
def delete(self, using=None, keep_parents=False): | |
using = using or router.db_for_write(self.__class__, instance=self) | |
assert self.pk is not None, ( | |
"%s object can't be deleted because its %s attribute is set to None." % | |
(self._meta.object_name, self._meta.pk.attname) | |
) | |
collector = Collector(using=using) | |
collector.collect([self], keep_parents=keep_parents) | |
return collector.delete() | |
delete.alters_data = True | |
def _get_FIELD_display(self, field): | |
value = getattr(self, field.attname) | |
return dict(field.flatchoices).get(value, value) | |
def _get_next_or_previous_by_FIELD(self, field, is_next, **kwargs): | |
if not self.pk: | |
raise ValueError("get_next/get_previous cannot be used on unsaved objects.") | |
op = 'gt' if is_next else 'lt' | |
order = '' if is_next else '-' | |
param = getattr(self, field.attname) | |
q = Q(**{'%s__%s' % (field.name, op): param}) | |
q = q | Q(**{field.name: param, 'pk__%s' % op: self.pk}) | |
qs = self.__class__._default_manager.using(self._state.db).filter(**kwargs).filter(q).order_by( | |
'%s%s' % (order, field.name), '%spk' % order | |
) | |
try: | |
return qs[0] | |
except IndexError: | |
raise self.DoesNotExist("%s matching query does not exist." % self.__class__._meta.object_name) | |
def _get_next_or_previous_in_order(self, is_next): | |
cachename = "__%s_order_cache" % is_next | |
if not hasattr(self, cachename): | |
op = 'gt' if is_next else 'lt' | |
order = '_order' if is_next else '-_order' | |
order_field = self._meta.order_with_respect_to | |
filter_args = order_field.get_filter_kwargs_for_object(self) | |
obj = self.__class__._default_manager.filter(**filter_args).filter(**{ | |
'_order__%s' % op: self.__class__._default_manager.values('_order').filter(**{ | |
self._meta.pk.name: self.pk | |
}) | |
}).order_by(order)[:1].get() | |
setattr(self, cachename, obj) | |
return getattr(self, cachename) | |
def prepare_database_save(self, field): | |
if self.pk is None: | |
raise ValueError("Unsaved model instance %r cannot be used in an ORM query." % self) | |
return getattr(self, field.remote_field.get_related_field().attname) | |
def clean(self): | |
""" | |
Hook for doing any extra model-wide validation after clean() has been | |
called on every field by self.clean_fields. Any ValidationError raised | |
by this method will not be associated with a particular field; it will | |
have a special-case association with the field defined by NON_FIELD_ERRORS. | |
""" | |
pass | |
def validate_unique(self, exclude=None): | |
""" | |
Check unique constraints on the model and raise ValidationError if any | |
failed. | |
""" | |
unique_checks, date_checks = self._get_unique_checks(exclude=exclude) | |
errors = self._perform_unique_checks(unique_checks) | |
date_errors = self._perform_date_checks(date_checks) | |
for k, v in date_errors.items(): | |
errors.setdefault(k, []).extend(v) | |
if errors: | |
raise ValidationError(errors) | |
def _get_unique_checks(self, exclude=None): | |
""" | |
Return a list of checks to perform. Since validate_unique() could be | |
called from a ModelForm, some fields may have been excluded; we can't | |
perform a unique check on a model that is missing fields involved | |
in that check. Fields that did not validate should also be excluded, | |
but they need to be passed in via the exclude argument. | |
""" | |
if exclude is None: | |
exclude = [] | |
unique_checks = [] | |
unique_togethers = [(self.__class__, self._meta.unique_together)] | |
for parent_class in self._meta.get_parent_list(): | |
if parent_class._meta.unique_together: | |
unique_togethers.append((parent_class, parent_class._meta.unique_together)) | |
for model_class, unique_together in unique_togethers: | |
for check in unique_together: | |
if not any(name in exclude for name in check): | |
# Add the check if the field isn't excluded. | |
unique_checks.append((model_class, tuple(check))) | |
# These are checks for the unique_for_<date/year/month>. | |
date_checks = [] | |
# Gather a list of checks for fields declared as unique and add them to | |
# the list of checks. | |
fields_with_class = [(self.__class__, self._meta.local_fields)] | |
for parent_class in self._meta.get_parent_list(): | |
fields_with_class.append((parent_class, parent_class._meta.local_fields)) | |
for model_class, fields in fields_with_class: | |
for f in fields: | |
name = f.name | |
if name in exclude: | |
continue | |
if f.unique: | |
unique_checks.append((model_class, (name,))) | |
if f.unique_for_date and f.unique_for_date not in exclude: | |
date_checks.append((model_class, 'date', name, f.unique_for_date)) | |
if f.unique_for_year and f.unique_for_year not in exclude: | |
date_checks.append((model_class, 'year', name, f.unique_for_year)) | |
if f.unique_for_month and f.unique_for_month not in exclude: | |
date_checks.append((model_class, 'month', name, f.unique_for_month)) | |
return unique_checks, date_checks | |
def _perform_unique_checks(self, unique_checks): | |
errors = {} | |
for model_class, unique_check in unique_checks: | |
# Try to look up an existing object with the same values as this | |
# object's values for all the unique field. | |
lookup_kwargs = {} | |
for field_name in unique_check: | |
f = self._meta.get_field(field_name) | |
lookup_value = getattr(self, f.attname) | |
# TODO: Handle multiple backends with different feature flags. | |
if (lookup_value is None or | |
(lookup_value == '' and connection.features.interprets_empty_strings_as_nulls)): | |
# no value, skip the lookup | |
continue | |
if f.primary_key and not self._state.adding: | |
# no need to check for unique primary key when editing | |
continue | |
lookup_kwargs[str(field_name)] = lookup_value | |
# some fields were skipped, no reason to do the check | |
if len(unique_check) != len(lookup_kwargs): | |
continue | |
qs = model_class._default_manager.filter(**lookup_kwargs) | |
# Exclude the current object from the query if we are editing an | |
# instance (as opposed to creating a new one) | |
# Note that we need to use the pk as defined by model_class, not | |
# self.pk. These can be different fields because model inheritance | |
# allows single model to have effectively multiple primary keys. | |
# Refs #17615. | |
model_class_pk = self._get_pk_val(model_class._meta) | |
if not self._state.adding and model_class_pk is not None: | |
qs = qs.exclude(pk=model_class_pk) | |
if qs.exists(): | |
if len(unique_check) == 1: | |
key = unique_check[0] | |
else: | |
key = NON_FIELD_ERRORS | |
errors.setdefault(key, []).append(self.unique_error_message(model_class, unique_check)) | |
return errors | |
def _perform_date_checks(self, date_checks): | |
errors = {} | |
for model_class, lookup_type, field, unique_for in date_checks: | |
lookup_kwargs = {} | |
# there's a ticket to add a date lookup, we can remove this special | |
# case if that makes it's way in | |
date = getattr(self, unique_for) | |
if date is None: | |
continue | |
if lookup_type == 'date': | |
lookup_kwargs['%s__day' % unique_for] = date.day | |
lookup_kwargs['%s__month' % unique_for] = date.month | |
lookup_kwargs['%s__year' % unique_for] = date.year | |
else: | |
lookup_kwargs['%s__%s' % (unique_for, lookup_type)] = getattr(date, lookup_type) | |
lookup_kwargs[field] = getattr(self, field) | |
qs = model_class._default_manager.filter(**lookup_kwargs) | |
# Exclude the current object from the query if we are editing an | |
# instance (as opposed to creating a new one) | |
if not self._state.adding and self.pk is not None: | |
qs = qs.exclude(pk=self.pk) | |
if qs.exists(): | |
errors.setdefault(field, []).append( | |
self.date_error_message(lookup_type, field, unique_for) | |
) | |
return errors | |
def date_error_message(self, lookup_type, field_name, unique_for): | |
opts = self._meta | |
field = opts.get_field(field_name) | |
return ValidationError( | |
message=field.error_messages['unique_for_date'], | |
code='unique_for_date', | |
params={ | |
'model': self, | |
'model_name': capfirst(opts.verbose_name), | |
'lookup_type': lookup_type, | |
'field': field_name, | |
'field_label': capfirst(field.verbose_name), | |
'date_field': unique_for, | |
'date_field_label': capfirst(opts.get_field(unique_for).verbose_name), | |
} | |
) | |
def unique_error_message(self, model_class, unique_check): | |
opts = model_class._meta | |
params = { | |
'model': self, | |
'model_class': model_class, | |
'model_name': capfirst(opts.verbose_name), | |
'unique_check': unique_check, | |
} | |
# A unique field | |
if len(unique_check) == 1: | |
field = opts.get_field(unique_check[0]) | |
params['field_label'] = capfirst(field.verbose_name) | |
return ValidationError( | |
message=field.error_messages['unique'], | |
code='unique', | |
params=params, | |
) | |
# unique_together | |
else: | |
field_labels = [capfirst(opts.get_field(f).verbose_name) for f in unique_check] | |
params['field_labels'] = get_text_list(field_labels, _('and')) | |
return ValidationError( | |
message=_("%(model_name)s with this %(field_labels)s already exists."), | |
code='unique_together', | |
params=params, | |
) | |
def full_clean(self, exclude=None, validate_unique=True): | |
""" | |
Call clean_fields(), clean(), and validate_unique() on the model. | |
Raise a ValidationError for any errors that occur. | |
""" | |
errors = {} | |
if exclude is None: | |
exclude = [] | |
else: | |
exclude = list(exclude) | |
try: | |
self.clean_fields(exclude=exclude) | |
except ValidationError as e: | |
errors = e.update_error_dict(errors) | |
# Form.clean() is run even if other validation fails, so do the | |
# same with Model.clean() for consistency. | |
try: | |
self.clean() | |
except ValidationError as e: | |
errors = e.update_error_dict(errors) | |
# Run unique checks, but only for fields that passed validation. | |
if validate_unique: | |
for name in errors: | |
if name != NON_FIELD_ERRORS and name not in exclude: | |
exclude.append(name) | |
try: | |
self.validate_unique(exclude=exclude) | |
except ValidationError as e: | |
errors = e.update_error_dict(errors) | |
if errors: | |
raise ValidationError(errors) | |
def clean_fields(self, exclude=None): | |
""" | |
Clean all fields and raise a ValidationError containing a dict | |
of all validation errors if any occur. | |
""" | |
if exclude is None: | |
exclude = [] | |
errors = {} | |
for f in self._meta.fields: | |
if f.name in exclude: | |
continue | |
# Skip validation for empty fields with blank=True. The developer | |
# is responsible for making sure they have a valid value. | |
raw_value = getattr(self, f.attname) | |
if f.blank and raw_value in f.empty_values: | |
continue | |
try: | |
setattr(self, f.attname, f.clean(raw_value, self)) | |
except ValidationError as e: | |
errors[f.name] = e.error_list | |
if errors: | |
raise ValidationError(errors) | |
@classmethod | |
def check(cls, **kwargs): | |
errors = [*cls._check_swappable(), *cls._check_model(), *cls._check_managers(**kwargs)] | |
if not cls._meta.swapped: | |
errors += [ | |
*cls._check_fields(**kwargs), | |
*cls._check_m2m_through_same_relationship(), | |
*cls._check_long_column_names(), | |
] | |
clash_errors = ( | |
*cls._check_id_field(), | |
*cls._check_field_name_clashes(), | |
*cls._check_model_name_db_lookup_clashes(), | |
*cls._check_property_name_related_field_accessor_clashes(), | |
*cls._check_single_primary_key(), | |
) | |
errors.extend(clash_errors) | |
# If there are field name clashes, hide consequent column name | |
# clashes. | |
if not clash_errors: | |
errors.extend(cls._check_column_name_clashes()) | |
errors += [ | |
*cls._check_index_together(), | |
*cls._check_unique_together(), | |
*cls._check_indexes(), | |
*cls._check_ordering(), | |
*cls._check_constraints(), | |
] | |
return errors | |
@classmethod | |
def _check_swappable(cls): | |
"""Check if the swapped model exists.""" | |
errors = [] | |
if cls._meta.swapped: | |
try: | |
apps.get_model(cls._meta.swapped) | |
except ValueError: | |
errors.append( | |
checks.Error( | |
"'%s' is not of the form 'app_label.app_name'." % cls._meta.swappable, | |
id='models.E001', | |
) | |
) | |
except LookupError: | |
app_label, model_name = cls._meta.swapped.split('.') | |
errors.append( | |
checks.Error( | |
"'%s' references '%s.%s', which has not been " | |
"installed, or is abstract." % ( | |
cls._meta.swappable, app_label, model_name | |
), | |
id='models.E002', | |
) | |
) | |
return errors | |
@classmethod | |
def _check_model(cls): | |
errors = [] | |
if cls._meta.proxy: | |
if cls._meta.local_fields or cls._meta.local_many_to_many: | |
errors.append( | |
checks.Error( | |
"Proxy model '%s' contains model fields." % cls.__name__, | |
id='models.E017', | |
) | |
) | |
return errors | |
@classmethod | |
def _check_managers(cls, **kwargs): | |
"""Perform all manager checks.""" | |
errors = [] | |
for manager in cls._meta.managers: | |
errors.extend(manager.check(**kwargs)) | |
return errors | |
@classmethod | |
def _check_fields(cls, **kwargs): | |
"""Perform all field checks.""" | |
errors = [] | |
for field in cls._meta.local_fields: | |
errors.extend(field.check(**kwargs)) | |
for field in cls._meta.local_many_to_many: | |
errors.extend(field.check(from_model=cls, **kwargs)) | |
return errors | |
@classmethod | |
def _check_m2m_through_same_relationship(cls): | |
""" Check if no relationship model is used by more than one m2m field. | |
""" | |
errors = [] | |
seen_intermediary_signatures = [] | |
fields = cls._meta.local_many_to_many | |
# Skip when the target model wasn't found. | |
fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase)) | |
# Skip when the relationship model wasn't found. | |
fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase)) | |
for f in fields: | |
signature = (f.remote_field.model, cls, f.remote_field.through, f.remote_field.through_fields) | |
if signature in seen_intermediary_signatures: | |
errors.append( | |
checks.Error( | |
"The model has two identical many-to-many relations " | |
"through the intermediate model '%s'." % | |
f.remote_field.through._meta.label, | |
obj=cls, | |
id='models.E003', | |
) | |
) | |
else: | |
seen_intermediary_signatures.append(signature) | |
return errors | |
@classmethod | |
def _check_id_field(cls): | |
"""Check if `id` field is a primary key.""" | |
fields = [f for f in cls._meta.local_fields if f.name == 'id' and f != cls._meta.pk] | |
# fields is empty or consists of the invalid "id" field | |
if fields and not fields[0].primary_key and cls._meta.pk.name == 'id': | |
return [ | |
checks.Error( | |
"'id' can only be used as a field name if the field also " | |
"sets 'primary_key=True'.", | |
obj=cls, | |
id='models.E004', | |
) | |
] | |
else: | |
return [] | |
@classmethod | |
def _check_field_name_clashes(cls): | |
"""Forbid field shadowing in multi-table inheritance.""" | |
errors = [] | |
used_fields = {} # name or attname -> field | |
# Check that multi-inheritance doesn't cause field name shadowing. | |
for parent in cls._meta.get_parent_list(): | |
for f in parent._meta.local_fields: | |
clash = used_fields.get(f.name) or used_fields.get(f.attname) or None | |
if clash: | |
errors.append( | |
checks.Error( | |
"The field '%s' from parent model " | |
"'%s' clashes with the field '%s' " | |
"from parent model '%s'." % ( | |
clash.name, clash.model._meta, | |
f.name, f.model._meta | |
), | |
obj=cls, | |
id='models.E005', | |
) | |
) | |
used_fields[f.name] = f | |
used_fields[f.attname] = f | |
# Check that fields defined in the model don't clash with fields from | |
# parents, including auto-generated fields like multi-table inheritance | |
# child accessors. | |
for parent in cls._meta.get_parent_list(): | |
for f in parent._meta.get_fields(): | |
if f not in used_fields: | |
used_fields[f.name] = f | |
for f in cls._meta.local_fields: | |
clash = used_fields.get(f.name) or used_fields.get(f.attname) or None | |
# Note that we may detect clash between user-defined non-unique | |
# field "id" and automatically added unique field "id", both | |
# defined at the same model. This special case is considered in | |
# _check_id_field and here we ignore it. | |
id_conflict = f.name == "id" and clash and clash.name == "id" and clash.model == cls | |
if clash and not id_conflict: | |
errors.append( | |
checks.Error( | |
"The field '%s' clashes with the field '%s' " | |
"from model '%s'." % ( | |
f.name, clash.name, clash.model._meta | |
), | |
obj=f, | |
id='models.E006', | |
) | |
) | |
used_fields[f.name] = f | |
used_fields[f.attname] = f | |
return errors | |
@classmethod | |
def _check_column_name_clashes(cls): | |
# Store a list of column names which have already been used by other fields. | |
used_column_names = [] | |
errors = [] | |
for f in cls._meta.local_fields: | |
_, column_name = f.get_attname_column() | |
# Ensure the column name is not already in use. | |
if column_name and column_name in used_column_names: | |
errors.append( | |
checks.Error( | |
"Field '%s' has column name '%s' that is used by " | |
"another field." % (f.name, column_name), | |
hint="Specify a 'db_column' for the field.", | |
obj=cls, | |
id='models.E007' | |
) | |
) | |
else: | |
used_column_names.append(column_name) | |
return errors | |
@classmethod | |
def _check_model_name_db_lookup_clashes(cls): | |
errors = [] | |
model_name = cls.__name__ | |
if model_name.startswith('_') or model_name.endswith('_'): | |
errors.append( | |
checks.Error( | |
"The model name '%s' cannot start or end with an underscore " | |
"as it collides with the query lookup syntax." % model_name, | |
obj=cls, | |
id='models.E023' | |
) | |
) | |
elif LOOKUP_SEP in model_name: | |
errors.append( | |
checks.Error( | |
"The model name '%s' cannot contain double underscores as " | |
"it collides with the query lookup syntax." % model_name, | |
obj=cls, | |
id='models.E024' | |
) | |
) | |
return errors | |
@classmethod | |
def _check_property_name_related_field_accessor_clashes(cls): | |
errors = [] | |
property_names = cls._meta._property_names | |
related_field_accessors = ( | |
f.get_attname() for f in cls._meta._get_fields(reverse=False) | |
if f.is_relation and f.related_model is not None | |
) | |
for accessor in related_field_accessors: | |
if accessor in property_names: | |
errors.append( | |
checks.Error( | |
"The property '%s' clashes with a related field " | |
"accessor." % accessor, | |
obj=cls, | |
id='models.E025', | |
) | |
) | |
return errors | |
@classmethod | |
def _check_single_primary_key(cls): | |
errors = [] | |
if sum(1 for f in cls._meta.local_fields if f.primary_key) > 1: | |
errors.append( | |
checks.Error( | |
"The model cannot have more than one field with " | |
"'primary_key=True'.", | |
obj=cls, | |
id='models.E026', | |
) | |
) | |
return errors | |
@classmethod | |
def _check_index_together(cls): | |
"""Check the value of "index_together" option.""" | |
if not isinstance(cls._meta.index_together, (tuple, list)): | |
return [ | |
checks.Error( | |
"'index_together' must be a list or tuple.", | |
obj=cls, | |
id='models.E008', | |
) | |
] | |
elif any(not isinstance(fields, (tuple, list)) for fields in cls._meta.index_together): | |
return [ | |
checks.Error( | |
"All 'index_together' elements must be lists or tuples.", | |
obj=cls, | |
id='models.E009', | |
) | |
] | |
else: | |
errors = [] | |
for fields in cls._meta.index_together: | |
errors.extend(cls._check_local_fields(fields, "index_together")) | |
return errors | |
@classmethod | |
def _check_unique_together(cls): | |
"""Check the value of "unique_together" option.""" | |
if not isinstance(cls._meta.unique_together, (tuple, list)): | |
return [ | |
checks.Error( | |
"'unique_together' must be a list or tuple.", | |
obj=cls, | |
id='models.E010', | |
) | |
] | |
elif any(not isinstance(fields, (tuple, list)) for fields in cls._meta.unique_together): | |
return [ | |
checks.Error( | |
"All 'unique_together' elements must be lists or tuples.", | |
obj=cls, | |
id='models.E011', | |
) | |
] | |
else: | |
errors = [] | |
for fields in cls._meta.unique_together: | |
errors.extend(cls._check_local_fields(fields, "unique_together")) | |
return errors | |
@classmethod | |
def _check_indexes(cls): | |
"""Check the fields of indexes.""" | |
fields = [field for index in cls._meta.indexes for field, _ in index.fields_orders] | |
return cls._check_local_fields(fields, 'indexes') | |
@classmethod | |
def _check_local_fields(cls, fields, option): | |
from django.db import models | |
# In order to avoid hitting the relation tree prematurely, we use our | |
# own fields_map instead of using get_field() | |
forward_fields_map = { | |
field.name: field for field in cls._meta._get_fields(reverse=False) | |
} | |
errors = [] | |
for field_name in fields: | |
try: | |
field = forward_fields_map[field_name] | |
except KeyError: | |
errors.append( | |
checks.Error( | |
"'%s' refers to the nonexistent field '%s'." % ( | |
option, field_name, | |
), | |
obj=cls, | |
id='models.E012', | |
) | |
) | |
else: | |
if isinstance(field.remote_field, models.ManyToManyRel): | |
errors.append( | |
checks.Error( | |
"'%s' refers to a ManyToManyField '%s', but " | |
"ManyToManyFields are not permitted in '%s'." % ( | |
option, field_name, option, | |
), | |
obj=cls, | |
id='models.E013', | |
) | |
) | |
elif field not in cls._meta.local_fields: | |
errors.append( | |
checks.Error( | |
"'%s' refers to field '%s' which is not local to model '%s'." | |
% (option, field_name, cls._meta.object_name), | |
hint="This issue may be caused by multi-table inheritance.", | |
obj=cls, | |
id='models.E016', | |
) | |
) | |
return errors | |
@classmethod | |
def _check_ordering(cls): | |
""" | |
Check "ordering" option -- is it a list of strings and do all fields | |
exist? | |
""" | |
if cls._meta._ordering_clash: | |
return [ | |
checks.Error( | |
"'ordering' and 'order_with_respect_to' cannot be used together.", | |
obj=cls, | |
id='models.E021', | |
), | |
] | |
if cls._meta.order_with_respect_to or not cls._meta.ordering: | |
return [] | |
if not isinstance(cls._meta.ordering, (list, tuple)): | |
return [ | |
checks.Error( | |
"'ordering' must be a tuple or list (even if you want to order by only one field).", | |
obj=cls, | |
id='models.E014', | |
) | |
] | |
errors = [] | |
fields = cls._meta.ordering | |
# Skip expressions and '?' fields. | |
fields = (f for f in fields if isinstance(f, str) and f != '?') | |
# Convert "-field" to "field". | |
fields = ((f[1:] if f.startswith('-') else f) for f in fields) | |
# Skip ordering in the format field1__field2 (FIXME: checking | |
# this format would be nice, but it's a little fiddly). | |
fields = (f for f in fields if LOOKUP_SEP not in f) | |
# Skip ordering on pk. This is always a valid order_by field | |
# but is an alias and therefore won't be found by opts.get_field. | |
fields = {f for f in fields if f != 'pk'} | |
# Check for invalid or nonexistent fields in ordering. | |
invalid_fields = [] | |
# Any field name that is not present in field_names does not exist. | |
# Also, ordering by m2m fields is not allowed. | |
opts = cls._meta | |
valid_fields = set(chain.from_iterable( | |
(f.name, f.attname) if not (f.auto_created and not f.concrete) else (f.field.related_query_name(),) | |
for f in chain(opts.fields, opts.related_objects) | |
)) | |
invalid_fields.extend(fields - valid_fields) | |
for invalid_field in invalid_fields: | |
errors.append( | |
checks.Error( | |
"'ordering' refers to the nonexistent field '%s'." % invalid_field, | |
obj=cls, | |
id='models.E015', | |
) | |
) | |
return errors | |
@classmethod | |
def _check_long_column_names(cls): | |
""" | |
Check that any auto-generated column names are shorter than the limits | |
for each database in which the model will be created. | |
""" | |
errors = [] | |
allowed_len = None | |
db_alias = None | |
# Find the minimum max allowed length among all specified db_aliases. | |
for db in settings.DATABASES: | |
# skip databases where the model won't be created | |
if not router.allow_migrate_model(db, cls): | |
continue | |
connection = connections[db] | |
max_name_length = connection.ops.max_name_length() | |
if max_name_length is None or connection.features.truncates_names: | |
continue | |
else: | |
if allowed_len is None: | |
allowed_len = max_name_length | |
db_alias = db | |
elif max_name_length < allowed_len: | |
allowed_len = max_name_length | |
db_alias = db | |
if allowed_len is None: | |
return errors | |
for f in cls._meta.local_fields: | |
_, column_name = f.get_attname_column() | |
# Check if auto-generated name for the field is too long | |
# for the database. | |
if f.db_column is None and column_name is not None and len(column_name) > allowed_len: | |
errors.append( | |
checks.Error( | |
'Autogenerated column name too long for field "%s". ' | |
'Maximum length is "%s" for database "%s".' | |
% (column_name, allowed_len, db_alias), | |
hint="Set the column name manually using 'db_column'.", | |
obj=cls, | |
id='models.E018', | |
) | |
) | |
for f in cls._meta.local_many_to_many: | |
# Skip nonexistent models. | |
if isinstance(f.remote_field.through, str): | |
continue | |
# Check if auto-generated name for the M2M field is too long | |
# for the database. | |
for m2m in f.remote_field.through._meta.local_fields: | |
_, rel_name = m2m.get_attname_column() | |
if m2m.db_column is None and rel_name is not None and len(rel_name) > allowed_len: | |
errors.append( | |
checks.Error( | |
'Autogenerated column name too long for M2M field ' | |
'"%s". Maximum length is "%s" for database "%s".' | |
% (rel_name, allowed_len, db_alias), | |
hint=( | |
"Use 'through' to create a separate model for " | |
"M2M and then set column_name using 'db_column'." | |
), | |
obj=cls, | |
id='models.E019', | |
) | |
) | |
return errors | |
@classmethod | |
def _check_constraints(cls): | |
errors = [] | |
for db in settings.DATABASES: | |
if not router.allow_migrate_model(db, cls): | |
continue | |
connection = connections[db] | |
if connection.features.supports_table_check_constraints: | |
continue | |
if any(isinstance(constraint, CheckConstraint) for constraint in cls._meta.constraints): | |
errors.append( | |
checks.Warning( | |
'%s does not support check constraints.' % connection.display_name, | |
hint=( | |
"A constraint won't be created. Silence this " | |
"warning if you don't care about it." | |
), | |
obj=cls, | |
id='models.W027', | |
) | |
) | |
return errors | |
############################################ | |
# HELPER FUNCTIONS (CURRIED MODEL METHODS) # | |
############################################ | |
# ORDERING METHODS ######################### | |
def method_set_order(self, ordered_obj, id_list, using=None): | |
if using is None: | |
using = DEFAULT_DB_ALIAS | |
order_wrt = ordered_obj._meta.order_with_respect_to | |
filter_args = order_wrt.get_forward_related_filter(self) | |
# FIXME: It would be nice if there was an "update many" version of update | |
# for situations like this. | |
with transaction.atomic(using=using, savepoint=False): | |
for i, j in enumerate(id_list): | |
ordered_obj.objects.filter(pk=j, **filter_args).update(_order=i) | |
def method_get_order(self, ordered_obj): | |
order_wrt = ordered_obj._meta.order_with_respect_to | |
filter_args = order_wrt.get_forward_related_filter(self) | |
pk_name = ordered_obj._meta.pk.name | |
return ordered_obj.objects.filter(**filter_args).values_list(pk_name, flat=True) | |
def make_foreign_order_accessors(model, related_model): | |
setattr( | |
related_model, | |
'get_%s_order' % model.__name__.lower(), | |
partialmethod(method_get_order, model) | |
) | |
setattr( | |
related_model, | |
'set_%s_order' % model.__name__.lower(), | |
partialmethod(method_set_order, model) | |
) | |
######## | |
# MISC # | |
######## | |
def model_unpickle(model_id): | |
"""Used to unpickle Model subclasses with deferred fields.""" | |
if isinstance(model_id, tuple): | |
model = apps.get_model(*model_id) | |
else: | |
# Backwards compat - the model was cached directly in earlier versions. | |
model = model_id | |
return model.__new__(model) | |
model_unpickle.__safe_for_unpickle__ = True |