这部分标题比较大,按照之前的分析方法肯定会比较复杂且不够系统,所以从另一个角度出发,我们通过对几个关键问题的追溯来帮助我们了解整个Django ORM的设计思想:
- Django ORM如何做到多数据库支持的;
- Django ORM中的objects是什么;
- Filter方法的查询流程;
1、准备工作
- Python 3.5.2
- Django 2.1.2
- PyCharm 2018.2.1 (Professional Edition)
- 启动项目
[min:] ~/Desktop/python/Demo$ python manage.py runserver 0.0.0.0:8000
2、分析流程
现在我们开始根据上述提到的问题进行逐个的分析。
2.1、Django ORM如何做到多数据库支持的
2.1.1、 Django db source tree
├── __init__.py
├── backends
│ ├── __init__.py
│ ├── base
│ ├── ddl_references.py
│ ├── dummy
│ ├── mysql
│ ├── oracle
│ ├── postgresql
│ ├── postgresql_psycopg2
│ ├── signals.py
│ ├── sqlite3
│ └── utils.py
├── migrations
│ ├── __init__.py
│ ├── ........
│ └── writer.py
├── models
│ ├── __init__.py
│ ├── ........
│ └── utils.py
├── transaction.py
└── utils.py
按照科学的推断,如果要做到多数据库的支持,一般的结构肯定是有一个Wrapper保证对外的接口一致,然后在这个Wrapper中,负责加载不同的数据库类型,执行相应的方法。而Django db的源码也和我们的猜想差不多,将后端的集中到backends中,在此结构下差异化不同的数据库;
2.1.2、具体的差异化加载流程
以Django服务启动时的数据库连接检查为例:
-
在Django源码分析一:服务启动一文中,我们有分析过Django服务的启动流程,在这个过程中间也包含了对数据库连接的检查,具体路径如下:django.core.management.base.BaseCommand#check_migrations,在这个方法中,引用了django.db.connections:
from django.db import DEFAULT_DB_ALIAS, connections # 调用ConnectionHandler的__init__方法 #.............省略............... def check_migrations(self): """ Print a warning if the set of migrations on disk don't match the migrations in the database. """ from django.db.migrations.executor import MigrationExecutor try: # python的魔术方法,调用到ConnectionHandler的__getitem__方法 executor = MigrationExecutor(connections[DEFAULT_DB_ALIAS]) except ImproperlyConfigured: # No databases are configured (or the dummy one) return
-
我们发现
connections = ConnectionHandler()
,查看ConnectionHandler类:class ConnectionHandler: def __init__(self, databases=None): """ databases is an optional dictionary of database definitions (structured like settings.DATABASES). """ self._databases = databases # 调用databases(self)方法 self._connections = local() @cached_property def databases(self): if self._databases is None: self._databases = settings.DATABASES if self._databases == {}: self._databases = { DEFAULT_DB_ALIAS: { 'ENGINE': 'django.db.backends.dummy', }, } if DEFAULT_DB_ALIAS not in self._databases: raise ImproperlyConfigured("You must define a '%s' database." % DEFAULT_DB_ALIAS) if self._databases[DEFAULT_DB_ALIAS] == {}: self._databases[DEFAULT_DB_ALIAS]['ENGINE'] = 'django.db.backends.dummy' return self._databases ................ def __getitem__(self, alias): if hasattr(self._connections, alias): return getattr(self._connections, alias) self.ensure_defaults(alias) self.prepare_test_settings(alias) db = self.databases[alias] backend = load_backend(db['ENGINE']) # 重要!!根据ENGINE的类型决定使用哪一种数据库 conn = backend.DatabaseWrapper(db, alias) setattr(self._connections, alias, conn) return conn
从上面代码注释可以了解到在
__init__
方法中通过调用databases完成对_databases
属性的赋值,将settings中的DATABASES赋值给这个变量;之后在check_migrations
方法中调用了ConnectionHandler的__getitem__
方法; -
django.db.utils.load_backend
def load_backend(backend_name): """ Return a database backend's "base" module given a fully qualified database backend name, or raise an error if it doesn't exist. """ # This backend was renamed in Django 1.9. if backend_name == 'django.db.backends.postgresql_psycopg2': backend_name = 'django.db.backends.postgresql' try: return import_module('%s.base' % backend_name) except ImportError as e_user: #.............省略...............
# 示例settings.DATABASES DATABASES = { 'default': { 'ENGINE': 'django.db.backends.mysql', 'NAME': config.DATABASES_NAME, 'USER': config.DATABASES_USER, 'PASSWORD': config.DATABASES_PASSWORD, 'HOST': config.DATABASES_HOST, 'PORT': config.DATABASES_PORT, } }
在这个方法中,根据settings.DATABASES的ENGINE值,完成对不同类型数据库的加载;每个类型的数据库拥有一个DatabaseWrapper作为其代理,作为后续操作的具体对象。
2.2 objects的作用
在分析ORM的filter之前,我们无法绕开objects这个方法,因为我们发现貌似所有的数据库操作都是基于objects,比如最常见的:
ret = models.Book.objects.filter(title="Django");
那么这个objects究竟是什么,对整个数据库操作有着怎样的作用呢?
2.2.1 django.db.models.base.ModelBase
def __new__(cls, name, bases, attrs, **kwargs):
#.............省略...............
new_class = super_new(cls, name, bases, new_attrs, **kwargs)
#.............省略...............
new_class._prepare() # 调用_prepare方法
def _prepare(cls):
#.............省略...............
if not opts.managers:
if any(f.name == 'objects' for f in opts.fields):
raise ValueError(
"Model %s must specify a custom Manager, because it has a "
"field named 'objects'." % cls.__name__
)
manager = Manager()
manager.auto_created = True
cls.add_to_class('objects', manager) # 完成objects的赋值
#.............省略...............
从使用方式上我们可以看到objects是Model的一个属性,那么这个属性是什么时候赋值给Model的呢?Book继承于Model,Model继承于ModelBase,在ModelBase中有如上两个重要方法(见注释)完成对objects的赋值。但是有一个问题我们需要注意就是赋值操作使用的是add_to_class方法而不是常见病的setter方法,那么这个方法的作用是什么呢?
def add_to_class(cls, name, value):
# We should call the contribute_to_class method only if it's bound
if not inspect.isclass(value) and hasattr(value, 'contribute_to_class'):
value.contribute_to_class(cls, name)
else:
setattr(cls, name, value)
从上面方法中,我们可以看到最后会调用contribute_to_class
方法,这个方法属于BaseManager。
2.2.1 django.db.models.manager.BaseManager
def contribute_to_class(self, model, name):
self.name = self.name or name
self.model = model
setattr(model, name, ManagerDescriptor(self))
model._meta.add_manager(self)
结合上面的分析,可以看到其实objects最后赋值的对象应该是ManagerDescriptor,这个是什么呢??
class ManagerDescriptor:
def __init__(self, manager):
self.manager = manager
def __get__(self, instance, cls=None):
if instance is not None:
raise AttributeError("Manager isn't accessible via %s instances" % cls.__name__)
if cls._meta.abstract:
raise AttributeError("Manager isn't available; %s is abstract" % (
cls._meta.object_name,
))
if cls._meta.swapped:
raise AttributeError(
"Manager isn't available; '%s.%s' has been swapped for '%s'" % (
cls._meta.app_label,
cls._meta.object_name,
cls._meta.swapped,
)
)
return cls._meta.managers_map[self.manager.name]
可以看到在使用Book.objects的时候其实正在起作用的还是传入进来的Manager示例,为什么要多此一举呢?
django 规定, 只有 Model 类可以使用 objects, Model 类实例不可以. 请注意区分类和类实例之间的区别.
其实是非常有道理的, Book.objects.filter(id=1) 返回的是 QuerySet 对象, 而 QuerySet 对象可以看成是 Model 实例的集合, 也就是 book_set 是 Model 实例的集合。假使Model 类的实例可以使用 objects 属性, 即从一本书中查询书」这在语意上不通过. 只能是从书的集合(Book)中查询书,所以 django 用 ManagerDescriptor 特意为 Manager 做的一层包装来校验。
2.2.3 django.db.models.manager.Manager
class Manager(BaseManager.from_queryset(QuerySet)):
pass
@classmethod
def from_queryset(cls, queryset_class, class_name=None):
if class_name is None:
class_name = '%sFrom%s' % (cls.__name__, queryset_class.__name__)
return type(class_name, (cls,), {
'_queryset_class': queryset_class,
**cls._get_queryset_methods(queryset_class),
})
从这两个方法中,可以看到其实Manager真正继承的应该是QuerySet这个方法,所以后续的filter,get等方法其实都是基于QuerySet的。
2.3 Filter方法的查询流程
这一章节的重点是让大家理解从Object到SQL到转化,了解Django ORM是如果工作的,为了让思路更加聚焦,所以涉及到一些细节的问题,不会再在Code层进行分析,只会提一下,有兴趣的话可以单独深入分析。
2.3.1 django/db/models/query.py
在使用filter的时候,其实调用的是_filter_or_exclude方法
def filter(self, *args, **kwargs):
"""
Return a new QuerySet instance with the args ANDed to the existing
set.
"""
return self._filter_or_exclude(False, *args, **kwargs)
def _filter_or_exclude(self, negate, *args, **kwargs):
if args or kwargs:
assert self.query.can_filter(), \
"Cannot filter a query once a slice has been taken."
clone = self._chain() # 得到一个QuerySet的对象
if negate:
clone.query.add_q(~Q(*args, **kwargs))
else:
# filter方法,通过add_q 方法,传入Q class
clone.query.add_q(Q(*args, **kwargs))
return clone
2.3.2 django/db/models/sql/query.py
def add_q(self, q_object):
"""
A preprocessor for the internal _add_q(). Responsible for doing final
join promotion.
"""
# For join promotion this case is doing an AND for the added q_object
# and existing conditions. So, any existing inner join forces the join
# type to remain inner. Existing outer joins can however be demoted.
# (Consider case where rel_a is LOUTER and rel_a__col=1 is added - if
# rel_a doesn't produce any rows, then the whole condition must fail.
# So, demotion is OK.
existing_inner = {a for a in self.alias_map if self.alias_map[a].join_type == INNER}
clause, _ = self._add_q(q_object, self.used_aliases)
if clause:
self.where.add(clause, AND)
self.demote_joins(existing_inner)
添加当前的Q对象到已存在的filter中,然后将返回的where对象插入到当前类的where中,且用and连接表示;另外在这个方法中同时处理了Django ORM中的通过__
符号来连接外健的操作。
**一般来讲,where语句写完,就应该进行查询操作,但是在重新顺着流程捋了一遍,都没有找到在什么地方有执行sql的操作,最后发现是因为Django的惰性查询关系,做完这些操作之后,并不会马上执行sql,而是等待需要用的Queryset的__iter__
的时候,才去真正的根据QuerySet 已经设置好的各种查询条件,去编译sql语句,执行并返回结果. **
以如下语句为例:
ret = models.Book.objects.filter(title="Django");
books = list(ret) # 或者 book01 = ret[1]
在执行完filter方法之后,使用断点或者日志打印的方式发现ret其实是一个django.db.models.query.QuerySet对象,然后使用如上两种方式才可以打印出具体的Book信息,所以接下来我们就需要看一下QuerySet的__iter__
方法。
2.3.3 django.db.models.query.ModelIterable
def __getitem__(self, k):
"""Retrieve an item or slice from the set of results."""
#.............省略...............
qs = self._chain()
qs.query.set_limits(k, k + 1)
qs._fetch_all() # 重要!!!
return qs._result_cache[0]
def _fetch_all(self):
if self._result_cache is None:
# 调用的ModelIterable的__iter__方法
self._result_cache = list(self._iterable_class(self))
if self._prefetch_related_lookups and not self._prefetch_done:
self._prefetch_related_objects()
class ModelIterable(BaseIterable):
"""Iterable that yields a model instance for each row."""
def __iter__(self):
queryset = self.queryset
db = queryset.db
#获取sql编译器,准备编译sql语句
compiler = queryset.query.get_compiler(using=db)
# Execute the query. This will also fill compiler.select, klass_info,
# and annotations.
# 真正执行Sql取回结果!!!
results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
select, klass_info, annotation_col_map = (compiler.select, compiler.klass_info,
compiler.annotation_col_map)
#.............省略...............
for row in compiler.results_iter(results):
obj = model_cls.from_db(db, init_list, row[model_fields_start:model_fields_end])
#.............省略...............
yield obj
2.3.4 django/db/models/sql/compiler.py
def execute_sql(self, result_type=MULTI, chunked_fetch=False, chunk_size=GET_ITERATOR_CHUNK_SIZE):
# 对数据库运行查询并返回结果(s),结果是一个的话直接返回,多个结果集就迭代
result_type = result_type or NO_RESULTS
try:
# 获取sql语句
sql, params = self.as_sql()
if not sql:
raise EmptyResultSet
except EmptyResultSet:
if result_type == MULTI:
return iter([])
else:
return
# 游标的获取,是直接获取还是分块游标(可能需要多个数据库的时候操作)
if chunked_fetch:
cursor = self.connection.chunked_cursor()
else:
cursor = self.connection.cursor()
try:
# 执行sql语句
cursor.execute(sql, params)
except Exception:
# Might fail for server-side cursors (e.g. connection closed)
cursor.close()
raise
# 返回游标还是一行数据
if result_type == CURSOR:
# Give the caller the cursor to process and close.
return cursor
if result_type == SINGLE:
try:
val = cursor.fetchone()
if val:
return val[0:self.col_count]
return val
finally:
# done with the cursor
cursor.close()
if result_type == NO_RESULTS:
cursor.close()
return
# 返回多行数据
result = cursor_iter(
cursor, self.connection.features.empty_fetchmany_value,
self.col_count if self.has_extra_select else None,
chunk_size,
)
if not chunked_fetch and not self.connection.features.can_use_chunked_reads:
try:
# If we are using non-chunked reads, we return the same data
# structure as normally, but ensure it is all read into memory
# before going any further. Use chunked_fetch if requested.
return list(result)
finally:
# done with the cursor
cursor.close()
return result
此方法中通过self.as_sql()拿到sql语句跟参数,获取cursor游标,执行sql并得到结果,然后根据传入的result_type来从游标中返回正确的结果集;
def as_sql(self, with_limits=True, with_col_aliases=False):
"""
Create the SQL for this query. Return the SQL string and list of
parameters.
If 'with_limits' is False, any limit/offset information is not included
in the query.
"""
refcounts_before = self.query.alias_refcount.copy()
try:
extra_select, order_by, group_by = self.pre_sql_setup()
for_update_part = None
# Is a LIMIT/OFFSET clause needed?
with_limit_offset = with_limits and (self.query.high_mark is not None or self.query.low_mark)
combinator = self.query.combinator
features = self.connection.features
if combinator:
if not getattr(features, 'supports_select_{}'.format(combinator)):
raise NotSupportedError('{} is not supported on this database backend.'.format(combinator))
result, params = self.get_combinator_sql(combinator, self.query.combinator_all)
else:
distinct_fields, distinct_params = self.get_distinct()
# This must come after 'select', 'ordering', and 'distinct'
# (see docstring of get_from_clause() for details).
from_, f_params = self.get_from_clause()
where, w_params = self.compile(self.where) if self.where is not None else ("", [])
having, h_params = self.compile(self.having) if self.having is not None else ("", [])
result = ['SELECT']
params = []
if self.query.distinct:
distinct_result, distinct_params = self.connection.ops.distinct_sql(
distinct_fields,
distinct_params,
)
result += distinct_result
params += distinct_params
out_cols = []
col_idx = 1
for _, (s_sql, s_params), alias in self.select + extra_select:
if alias:
s_sql = '%s AS %s' % (s_sql, self.connection.ops.quote_name(alias))
elif with_col_aliases:
s_sql = '%s AS %s' % (s_sql, 'Col%d' % col_idx)
col_idx += 1
params.extend(s_params)
out_cols.append(s_sql)
result += [', '.join(out_cols), 'FROM', *from_]
params.extend(f_params)
#.............省略...............
if where:
result.append('WHERE %s' % where)
params.extend(w_params)
#.............省略...............
# 拼接成sql语句
return ' '.join(result), tuple(params)
finally:
# Finally do cleanup - get rid of the joins we created above.
self.query.reset_refcounts(refcounts_before)
如果我们忽略掉这过程中的许多细节如:怎么获取select,where,order_by等sql部分,怎么对上面各部分各个连接啊,参数等合法检验等部分,就会发现,其实as_sql的实现方式不外乎就是: 用list一次存储各个部分,然后"".join方式连接这个list成一个字符串,当然,各部分包括(但不限于):
- select部分
- distinct
- where表达式
- group表达式
- having表达式
- 是否加入limit or offset