前期环境
- pycharm 2018.2
- Django 3.1.3
分析流程
示例
import os
import django
if __name__ == '__main__':
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'SimpleRBAC.settings')
django.setup()
from apps.system.models import Menu
res = Menu.objects.filter(name="任务管理").first()
print(res)
检查加载了 setting 里面的数据库
django.core.management.base.BaseCommand#check_migrations
def check_migrations(self):
"""
检查迁移集与数据库中迁移集是否匹配。打印警告
"""
from django.db.migrations.executor import MigrationExecutor
try:
# 特别注意这行
executor = MigrationExecutor(connections[DEFAULT_DB_ALIAS])
except ImproperlyConfigured:
pass
plan = executor.migration_plan(executor.loader.graph.leaf_nodes())
if plan:
// DEFAULT_DB_ALIAS的值
DEFAULT_DB_ALIAS = 'default'
// 这个是上面括号中
connections[DEFAULT_DB_ALIAS]
// 什么是connections 这个对象
connections = ConnectionHandler()
可以看下ConnectionHandler 这个类在干什么
class ConnectionHandler:
def __init__(self, databases=None):
self._databases = databases
self._connections = Local(thread_critical=True)
@cached_property
def databases(self):
if self._databases is None:
self._databases = settings.DATABASES
if self._databases == {}:
self._databases = {
DEFAULT_DB_ALIAS: {
'ENGINE': 'django.db.backends.dummy',
},
}
if DEFAULT_DB_ALIAS not in self._databases:
raise ImproperlyConfigured("You must define a '%s' database." % DEFAULT_DB_ALIAS)
if self._databases[DEFAULT_DB_ALIAS] == {}:
self._databases[DEFAULT_DB_ALIAS]['ENGINE'] = 'django.db.backends.dummy'
return self._databases
def ensure_defaults(self, alias):
"""
没有设定给定连接的话。将默认值放入字典中
"""
try:
conn = self.databases[alias]
except KeyError:
raise ConnectionDoesNotExist("The connection %s doesn't exist" % alias)
conn.setdefault('ATOMIC_REQUESTS', False)
conn.setdefault('AUTOCOMMIT', True)
conn.setdefault('ENGINE', 'django.db.backends.dummy')
if conn['ENGINE'] == 'django.db.backends.' or not conn['ENGINE']:
conn['ENGINE'] = 'django.db.backends.dummy'
conn.setdefault('CONN_MAX_AGE', 0)
conn.setdefault('OPTIONS', {})
conn.setdefault('TIME_ZONE', None)
for setting in ['NAME', 'USER', 'PASSWORD', 'HOST', 'PORT']:
conn.setdefault(setting, '')
def prepare_test_settings(self, alias):
"""
Make sure the test settings are available in the 'TEST' sub-dictionary.
"""
try:
conn = self.databases[alias]
except KeyError:
raise ConnectionDoesNotExist("The connection %s doesn't exist" % alias)
test_settings = conn.setdefault('TEST', {})
default_test_settings = [
('CHARSET', None),
('COLLATION', None),
('MIGRATE', True),
('MIRROR', None),
('NAME', None),
]
for key, value in default_test_settings:
test_settings.setdefault(key, value)
特别注意这个方法。只要用到了字典取值都会经历这个方法 __getitem__: 按照索引获取值
connections[DEFAULT_DB_ALIAS] 这里会使用这个方法
def __getitem__(self, alias):
if hasattr(self._connections, alias):
return getattr(self._connections, alias)
# 设置数据默认值。当键不存在设定默认值 alias="default"
self.ensure_defaults(alias)
# 确保设置的数据库配置可用
self.prepare_test_settings(alias)
# db {'ENGINE': 'django.db.backends.mysql', 'NAME': '', 'USER': 'root', 'PASSWORD': '', 'PORT': 53307, 'HOST': '', 'OPTIONS': {'init_command': 'SET default_storage_engine=INNODB;'}, 'CONN_MAX_AGE': 3600, 'ATOMIC_REQUESTS': False, 'AUTOCOMMIT': True, 'TIME_ZONE': None, 'TEST': {'CHARSET': None, 'COLLATION': None, 'MIGRATE': True, 'MIRROR': None, 'NAME': None}}
db = self.databases[alias]
# 最重要一个方法。根据 db['ENGINE'] 来获取相应的模块类 <module 'django.db.backends.mysql.base' from '
backend = load_backend(db['ENGINE'])
# 每个类型都拥有一个DatabaseWrapper 作为代理。作为后续操作的具体对象
conn = backend.DatabaseWrapper(db, alias)
setattr(self._connections, alias, conn)
return conn
思考下什么时候调用的__getitem__ 方法, connections[DEFAULT_DB_ALIAS]已经调用了
def load_backend(backend_name):
# backend_name = 'django.db.backends.mysql'
if backend_name == 'django.db.backends.postgresql_psycopg2':
backend_name = 'django.db.backends.postgresql'
try:
# 这里加载模块 <module 'django.db.backends.mysql.base' from '
return import_module('%s.base' % backend_name)
except ImportError as e_user:
pass
# 示例settings.DATABASES
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': config.DATABASES_NAME,
'USER': config.DATABASES_USER,
'PASSWORD': config.DATABASES_PASSWORD,
'HOST': config.DATABASES_HOST,
'PORT': config.DATABASES_PORT,
}
}
我们这里重点关注下ConnectionHandler类里面的__getitem__方法。
他本身就是实例取值一定会调用这个方法
example
class DataBase:
'''Python 3 中的类'''
def __init__(self, id, address):
'''初始化方法'''
self.id = id
self.address = address
self.d = {self.id: 1,
self.address: "192.168.1.1",
}
def __getitem__(self, key):
# return self.__dict__.get(key, "100")
print('进入')
return self.d.get(key, "default")
data = DataBase(1, "192.168.2.11")
print(data["hi"])
print(data[data.id])
进入
default
进入
1
Menu.objects
Menu。将objects作为属性赋值给初始化Menu。
继承关系
Menu –> Models –> ModelBase
我们先看下 ModelBase 的源码
- django.db.models.base.ModelBase
def __new__(cls, name, bases, attrs, **kwargs):
#.............省略...............
new_class = super_new(cls, name, bases, new_attrs, **kwargs)
#.............省略...............
new_class._prepare() # 调用_prepare方法
def _prepare(cls):
#.............省略...............
if not opts.managers:
if any(f.name == 'objects' for f in opts.fields):
raise ValueError(
"Model %s must specify a custom Manager, because it has a "
"field named 'objects'." % cls.__name__
)
manager = Manager()
manager.auto_created = True
cls.add_to_class('objects', manager) # 完成objects的赋值
#.............省略...............
我们可以看到 cls.add_to_class('objects', manager) 调用了add_to_class
而manager 是Manager() 的实例
- django.db.models.base.ModelBase # add_to_class(cls,name,value)
def add_to_class(cls, name, value):
if _has_contribute_to_class(value):
value.contribute_to_class(cls, name)
else:
setattr(cls, name, value)
这里我们可以看到调用了contribute_to_class
这里从上下文可知value 是manager 所有调用的是
- django.db.models.manager.BaseManager # contribute_to_class
def contribute_to_class(self, model, name):
self.name = self.name or name
self.model = model
给model 赋值 name为objects 的值为ManagerDescriptor
setattr(model, name, ManagerDescriptor(self))
model._meta.add_manager(self)
原来如此是调用了setattr(model, name, ManagerDescriptor(self))
setattr 使用技巧https://www.runoob.com/python/python-func-setattr.html
将属性的值赋值某个对象
看下默认值 ManagerDescriptor(self))是什么(self 就是Manager 的实例)
class ManagerDescriptor:
def __init__(self, manager):
# 这里将manager实例 传入。其实也是 Menu.objects。原因是之前调用了setattr(model, name, ManagerDescriptor(self))。
self.manager = manager
def __get__(self, instance, cls=None):
if instance is not None:
raise AttributeError("Manager isn't accessible via %s instances" % cls.__name__)
if cls._meta.abstract:
raise AttributeError("Manager isn't available; %s is abstract" % (
cls._meta.object_name,
))
if cls._meta.swapped:
raise AttributeError(
"Manager isn't available; '%s.%s' has been swapped for '%s'" % (
cls._meta.app_label,
cls._meta.object_name,
cls._meta.swapped,
)
)
return cls._meta.managers_map[self.manager.name]
那么这里就解决了一个问题就是为什么是Models 去调用objects 。而不是Models 的实例去调用。理论上实例类的属性其实也应该是这个类实例的属性
def __get__(self, instance, cls=None):
if instance is not None:
raise AttributeError("Manager isn't accessible via %s instances" % cls.__name__)
这里使用属性描述符规范了调用。如果是实例调用的话就会抛出异常
从语义上也可以这样理解
在所有书籍中找到名字为xxx 的一本书
你不能理解 在所有书籍的一本书中找到名字为xxx 的一本书
这里就规范了调用
这里使用ManagerDescriptor也是为Manager修饰
我们都知道Menu.objects 就是Manager 的实例。那我们看下Manager 的源码
class Manager(BaseManager.from_queryset(QuerySet)):
pass
我们看下Manager到底继承了什么
@classmethod
def from_queryset(cls, queryset_class, class_name=None):
"""
queryset_class = <class 'django.db.models.query.QuerySet'>
class_name = None
"""
if class_name is None:
class_name = '%sFrom%s' % (cls.__name__, queryset_class.__name__)
# class_name = 'BaseManagerFromQuerySet'
# 创建一个子类
# http://c.biancheng.net/view/2292.html
# type(name, bases, dict)
# 生成子类
# 返回<class 'django.db.models.manager.BaseManagerFromQuerySet'> 继承了QuerySet 的所有方法
return type(class_name, (cls,), {
'_queryset_class': queryset_class,
**cls._get_queryset_methods(queryset_class),
})
柳暗花明又一村
type(class_name, (cls,), {
'_queryset_class': queryset_class,
**cls._get_queryset_methods(queryset_class),
})
这里返回时基于BaseManagerFromQuerySet 赋值了QuerySet 的所有方法。所以Manager 可以理解为继承了QuerySet
- 调用Manager 的 filter 方法
其实本质是调用QuerySet 的 filter 方法
- django.db.models.query.QuerySet # filter(self, *args, **kwargs)
def filter(self, *args, **kwargs):
"""
返回一个新的QuerySet 实例
Return a new QuerySet instance with the args ANDed to the existing
set.
"""
self._not_support_combined_queries('filter')
return self._filter_or_exclude(False, *args, **kwargs)
- django.db.models.query.QuerySet # _filter_or_exclude(False, *args, **kwargs)
def _filter_or_exclude(self, negate, *args, **kwargs):
if args or kwargs:
assert not self.query.is_sliced, \
"Cannot filter a query once a slice has been taken."
# 这里传入一个QuerySet 对象
clone = self._chain()
if self._defer_next_filter:
self._defer_next_filter = False
clone._deferred_filter = negate, args, kwargs
else:
clone._filter_or_exclude_inplace(negate, *args, **kwargs)
return clone
- django.db.models.query.QuerySet # _filter_or_exclude_inplace(self, negate *args, **kwargs)
def _filter_or_exclude_inplace(self, negate, *args, **kwargs):
if negate:
self._query.add_q(~Q(*args, **kwargs))
else:
self._query.add_q(Q(*args, **kwargs))
这里我们可以看到传入一个self._query。那么他到底是什么呢?
- django.db.models.query.QuerySet # init(self,model,)
class QuerySet:
"""Represent a lazy database lookup for a set of objects."""
def __init__(self, model=None, query=None, using=None, hints=None):
self.model = model
self._db = using
self._hints = hints or {}
# 最重要一句话。当时一直不知道self.model 是如何来的
self._query = query or sql.Query(self.model)
可以看到调用了。返回了一个基础model 的sql 查询类
sql.Query(self.model)
- django.db.models.sql.Query
class Query(BaseExpression):
"""A single SQL query."""
alias_prefix = 'T'
subq_aliases = frozenset([alias_prefix])
compiler = 'SQLCompiler'
def __init__(self, model, where=WhereNode, alias_cols=True):
self.model = model
self.alias_refcount = {}
这是一个基于SQL 的查寻类
返回上面
self._query.add_q(Q(*args, **kwargs)) 他是调用了Query 的add_q 方法
显示调用了Q对象
- django.db.models.sql.Query # add_q(self, q_object)
Q()对象就是为了将这些条件组合起来。
def add_q(self, q_object):
existing_inner = {a for a in self.alias_map if self.alias_map[a].join_type == INNER}
clause, _ = self._add_q(q_object, self.used_aliases)
if clause:
self.where.add(clause, AND)
self.demote_joins(existing_inner)
这里深究的话会到无底洞。大体这段意思
添加当前的Q对象到已存在的filter中,然后将返回的where对象插入到当前类的where中,且用and连接表示;另外在这个方法中同时处理了Django ORM中的通过__符号来连接外健的操作
- Q 对象有什么作用呢?
- 这里为什么需要添加Q对象
我们知道调用QuerySet 可能有许多API。比如filter 里面有许多查询条件
filter(name="任务管理", pid=2)
那么使用Q 对象可控将这两个条件按 and 连接进行组合查询。这样可以适应复杂查询
- 这里一直没看到执行sql 。而一直都是组装sql。那么执行sql 是在哪呢?
发现是因为Django的惰性查询关系,
做完这些操作之后,并不会马上执行sql,而是等待需要用的Queryset的__iter__的
时候,才去真正的根据QuerySet已经设置好的各种查询条件,去编译sql语句,执行并返回结果. *
这里注意可能有两种方式查询。其实说到底都是殊途同归
Menu.objects.filter(name='任务管理')[0]
Menu.objests.filter(name='任务管理').first()
先分析下第一中按索引来搜索的。那么他肯定调用了QuerySet的__getitem__方法
那么我们先看这个方法
- django/db/models/query.py # getitem(self, K)
def __getitem__(self, k):
----省略----
# 这里返回一个QuerySet 对象
qs = self._chain()
qs.query.set_limits(k, k + 1)
# 这里是最重要的一段
qs._fetch_all()
return qs._result_cache[0]
def _fetch_all(self):
if self._result_cache is None:
# 这里调用了ModelIterable 的__iter__方法。并转为列表
self._result_cache = list(self._iterable_class(self))
if self._prefetch_related_lookups and not self._prefetch_done:
self._prefetch_related_objects()
def __init__(self, model=None, query=None, using=None, hints=None):
self._iterable_class = ModelIterable
- django.db.models.query.ModelIterable
class ModelIterable(BaseIterable):
def __iter__(self):
queryset = self.queryset
# default
db = queryset.db
# 获取sql 编译器, 准备编译sql 语句
compiler = queryset.query.get_compiler(using=db)
# 真正执行Sql 取回结果
results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
select, klass_info, annotation_col_map = (compiler.select, compiler.klass_info,
compiler.annotation_col_map)
# ---- 省略 ----------------
for row in compiler.results_iter(results):
# 这里在将sql 结果封装为Models 对象
obj = model_cls.from_db(db, init_list, row[model_fields_start:model_fields_end])
# ------ 省略 --------------
# 可以看到这里使用生成器返回 基于models 的 类对象
yield obj
- django/db/models/sql/query.py Query # get_compiler(self,using…)
def get_compiler(self, using=None, connection=None):
if using is None and connection is None:
raise ValueError("Need either using or connection")
if using:
# 这里就是获取之前的连接驱动, 进而获取对应连接
connection = connections[using]
return connection.ops.compiler(self.compiler)(self, connection, using)
- django.db.models.sql.complier # execute_sql()
def execute_sql(self, result_type=MULTI, chunked_fetch=False, chunk_size=GET_ITERATOR_CHUNK_SIZE):
# 对数据库运行查询并返回结果(s),结果是一个的话直接返回,多个结果的话就迭代
result_type = result_type or NO_RESULTS
try:
# 获取sql 语句
# 'SELECT `system_menu`.`id`, `system_menu`.`name`, `system_menu`.`path`, `system_menu`.`pid_id` FROM `system_menu` WHERE `system_menu`.`name` = %s ORDER BY `system_menu`.`id` ASC LIMIT 1'
sql, params = self.as_sql()
if not sql:
raise EmptyResultSet
except EmptyResultSet:
if result_type == MULTI:
return iter([])
else:
return
# 游标的获取,是直接获取还是分块游标( 可能需要多个数据库的时候操作)
if chunked_fetch:
cursor = self.connection.chunked_cursor()
else:
cursor = self.connection.cursor()
try:
# 执行sql 语句
cursor.execute(sql, params)
except Exception:
# Might fail for server-side cursors (e.g. connection closed)
cursor.close()
raise
# 返回游标还是以行数据
if result_type == CURSOR:
# Give the caller the cursor to process and close.
return cursor
if result_type == SINGLE:
try:
val = cursor.fetchone()
if val:
return val[0:self.col_count]
return val
finally:
# done with the cursor
cursor.close()
if result_type == NO_RESULTS:
cursor.close()
return
返回多行数据
result = cursor_iter(
cursor, self.connection.features.empty_fetchmany_value,
self.col_count if self.has_extra_select else None,
chunk_size,
)
if not chunked_fetch or not self.connection.features.can_use_chunked_reads:
try:
# If we are using non-chunked reads, we return the same data
# structure as normally, but ensure it is all read into memory
# before going any further. Use chunked_fetch if requested,
# unless the database doesn't support it.
return list(result)
finally:
# done with the cursor
cursor.close()
return result
- /django/db/models/sql/compiler.py # as_sql()
def as_sql(self, with_limits=True, with_col_aliases=False):
refcounts_before = self.query.alias_refcount.copy()
try:
extra_select, order_by, group_by = self.pre_sql_setup()
for_update_part = None
# Is a LIMIT/OFFSET clause needed?
with_limit_offset = with_limits and (self.query.high_mark is not None or self.query.low_mark)
combinator = self.query.combinator
features = self.connection.features
if combinator:
if not getattr(features, 'supports_select_{}'.format(combinator)):
raise NotSupportedError('{} is not supported on this database backend.'.format(combinator))
result, params = self.get_combinator_sql(combinator, self.query.combinator_all)
else:
distinct_fields, distinct_params = self.get_distinct()
# This must come after 'select', 'ordering', and 'distinct'
# (see docstring of get_from_clause() for details).
from_, f_params = self.get_from_clause()
where, w_params = self.compile(self.where) if self.where is not None else ("", [])
having, h_params = self.compile(self.having) if self.having is not None else ("", [])
result = ['SELECT']
params = []
if self.query.distinct:
distinct_result, distinct_params = self.connection.ops.distinct_sql(
distinct_fields,
distinct_params,
)
result += distinct_result
params += distinct_params
out_cols = []
col_idx = 1
for _, (s_sql, s_params), alias in self.select + extra_select:
if alias:
s_sql = '%s AS %s' % (s_sql, self.connection.ops.quote_name(alias))
elif with_col_aliases:
s_sql = '%s AS %s' % (s_sql, 'Col%d' % col_idx)
col_idx += 1
params.extend(s_params)
out_cols.append(s_sql)
result += [', '.join(out_cols), 'FROM', *from_]
params.extend(f_params)
if self.query.select_for_update and self.connection.features.has_select_for_update:
if self.connection.get_autocommit():
raise TransactionManagementError('select_for_update cannot be used outside of a transaction.')
if with_limit_offset and not self.connection.features.supports_select_for_update_with_limit:
raise NotSupportedError(
'LIMIT/OFFSET is not supported with '
'select_for_update on this database backend.'
)
nowait = self.query.select_for_update_nowait
skip_locked = self.query.select_for_update_skip_locked
of = self.query.select_for_update_of
# If it's a NOWAIT/SKIP LOCKED/OF query but the backend
# doesn't support it, raise NotSupportedError to prevent a
# possible deadlock.
if nowait and not self.connection.features.has_select_for_update_nowait:
raise NotSupportedError('NOWAIT is not supported on this database backend.')
elif skip_locked and not self.connection.features.has_select_for_update_skip_locked:
raise NotSupportedError('SKIP LOCKED is not supported on this database backend.')
elif of and not self.connection.features.has_select_for_update_of:
raise NotSupportedError('FOR UPDATE OF is not supported on this database backend.')
for_update_part = self.connection.ops.for_update_sql(
nowait=nowait,
skip_locked=skip_locked,
of=self.get_select_for_update_of_arguments(),
)
if for_update_part and self.connection.features.for_update_after_from:
result.append(for_update_part)
if where:
result.append('WHERE %s' % where)
params.extend(w_params)
grouping = []
for g_sql, g_params in group_by:
grouping.append(g_sql)
params.extend(g_params)
if grouping:
if distinct_fields:
raise NotImplementedError('annotate() + distinct(fields) is not implemented.')
order_by = order_by or self.connection.ops.force_no_ordering()
result.append('GROUP BY %s' % ', '.join(grouping))
if self._meta_ordering:
order_by = None
if having:
result.append('HAVING %s' % having)
params.extend(h_params)
if self.query.explain_query:
result.insert(0, self.connection.ops.explain_query_prefix(
self.query.explain_format,
**self.query.explain_options
))
if order_by:
ordering = []
for _, (o_sql, o_params, _) in order_by:
ordering.append(o_sql)
params.extend(o_params)
result.append('ORDER BY %s' % ', '.join(ordering))
if with_limit_offset:
result.append(self.connection.ops.limit_offset_sql(self.query.low_mark, self.query.high_mark))
if for_update_part and not self.connection.features.for_update_after_from:
result.append(for_update_part)
if self.query.subquery and extra_select:
# If the query is used as a subquery, the extra selects would
# result in more columns than the left-hand side expression is
# expecting. This can happen when a subquery uses a combination
# of order_by() and distinct(), forcing the ordering expressions
# to be selected as well. Wrap the query in another subquery
# to exclude extraneous selects.
sub_selects = []
sub_params = []
for index, (select, _, alias) in enumerate(self.select, start=1):
if not alias and with_col_aliases:
alias = 'col%d' % index
if alias:
sub_selects.append("%s.%s" % (
self.connection.ops.quote_name('subquery'),
self.connection.ops.quote_name(alias),
))
else:
select_clone = select.relabeled_clone({select.alias: 'subquery'})
subselect, subparams = select_clone.as_sql(self, self.connection)
sub_selects.append(subselect)
sub_params.extend(subparams)
return 'SELECT %s FROM (%s) subquery' % (
', '.join(sub_selects),
' '.join(result),
), tuple(sub_params + params)
return ' '.join(result), tuple(params)
finally:
# Finally do cleanup - get rid of the joins we created above.
self.query.reset_refcounts(refcounts_before)
我们第二种说的调用API呢?first()
def first(self):
"""Return the first object of a query or None if no match is found."""
for obj in (self if self.ordered else self.order_by('pk'))[:1]:
return obj
(self if self.ordered else self.order_by('pk')) 这里返回QuerySet 对象。那么使用[:1] 第一个
- django/db/models/query.py QuerySet # getitem(self, k)
getitem -> ModelIterable -> iter(self)
重复上一个过程。使用生成器的方式返回数据
总结
- 启动根据setting配置的数据库。对应加载不同数据库
- Manager 调用类 django.db.models.query.QuerySet 的filter 方法
- Queryset 调用类 django.db.models.sql.Query 来验证和解析filter 的参数 (username 字段是否存在) 是否合法。
- Queryset 调用Query获取对应的Complier,由Complier生成sql,并执行sql 语句
- 将sql 执行结果再次封装成models 对象 做为结果吐出