flask_sqlalchemy对sqlalchemy的scoped_session进行了封装,使其session默认为线程安全(scoped_session)方式。通过查询源码我们发现了scoped_session的创建及工作原理。
sqlalchemy的session创建为工厂模式,及通过传递sessionmaker的函数调用,然后通过实现call方法实现其实例调用。这里对实际创建session的工厂Session类不做探究,只探究该session是如何通过工厂创建的。
我们一般使用db.session.query的方法查询,在flask_sqlalchemy的SQLAlchemy类中实现了session的定义
class SQLAlchemy(object):
def __init__(self, app=None, use_native_unicode=True, session_options=None,
metadata=None, query_class=BaseQuery, model_class=Model,
engine_options=None):
self.session = self.create_scoped_session(session_options) # 创建session
# 省略其他代码···
def create_scoped_session(self, options=None):
if options is None:
options = {}
# _app_ctx_stack.__ident_func__ 是指获取当前上下文的线程id
scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)
options.setdefault('query_cls', self.Query)
return orm.scoped_session(
self.create_session(options), scopefunc=scopefunc
)
def create_session(self, options):
return orm.sessionmaker(class_=SignallingSession, db=self, **options)
这里可以看出create_session通过封装orm.sessionmaker并将其传递给orm.scoped_session进行创建,scoped_session接收了两个参数,分别为sessionmaker和scopefunc,scopefunc是通过flask应用上下文生成线程id的调用。由scopefunc可以看出猜测scoped_session是线程安全的。orm.scoped_session是其核心,再看其实怎么实现的
sqlalchemy/orm/scoping.py
class scoped_session(object):
def __init__(self, session_factory, scopefunc=None):
self.session_factory = session_factory
if scopefunc:
self.registry = ScopedRegistry(session_factory, scopefunc)
else:
self.registry = ThreadLocalRegistry(session_factory)
def __call__(self, **kw):
if kw:
if self.registry.has():
raise sa_exc.InvalidRequestError(
"Scoped session is already present; "
"no new arguments may be specified."
)
else:
sess = self.session_factory(**kw)
self.registry.set(sess)
return sess
else:
return self.registry()
def remove(self):
if self.registry.has():
self.registry().close()
self.registry.clear()
def configure(self, **kwargs):
if self.registry.has():
warn(
"At least one scoped session is already present. "
" configure() can not affect sessions that have "
"already been created."
)
self.session_factory.configure(**kwargs)
def query_property(self, query_cls=None):
class query(object):
def __get__(s, instance, owner):
try:
mapper = class_mapper(owner)
if mapper:
if query_cls:
# custom query class
return query_cls(mapper, session=self.registry())
else:
# session's configured query class
return self.registry().query(mapper)
except orm_exc.UnmappedClassError:
return None
return query()
ScopedSession = scoped_session
def instrument(name):
def do(self, *args, **kwargs):
return getattr(self.registry(), name)(*args, **kwargs)
return do
for meth in Session.public_methods:
setattr(scoped_session, meth, instrument(meth))
def makeprop(name):
def set_(self, attr):
setattr(self.registry(), name, attr)
def get(self):
return getattr(self.registry(), name)
return property(get, set_)
for prop in (
"bind",
"dirty",
"deleted",
"new",
"identity_map",
"is_active",
"autoflush",
"no_autoflush",
"info",
"autocommit",
):
setattr(scoped_session, prop, makeprop(prop))
def clslevel(name):
def do(cls, *args, **kwargs):
return getattr(Session, name)(*args, **kwargs)
return classmethod(do)
for prop in ("close_all", "object_session", "identity_key"):
setattr(scoped_session, prop, clslevel(prop))
这里可以看出整个session的核心就是self.registry,几乎所有的操作都是由self.registry进行的。对于session的创建使用了工厂模式模式。
从ScopedRegistry的代码可看出,当我们调用了实例对象self.registry()的时候,如果当前线程已经绑定了session将其返回,如果没有则调用了工厂去生成session,然后将其绑定到当前线程上。
我觉得这段代码最有意思的地方在于下面那三个对session对象上的方法与属性的绑定。这也是我们为什么可以直接使用db.session.query
,而不是db.session().query
的原因,这里体现了代理模式的思想。
class ScopedRegistry(object):
def __init__(self, createfunc, scopefunc):
self.createfunc = createfunc
self.scopefunc = scopefunc
self.registry = {}
def __call__(self):
key = self.scopefunc()
try:
return self.registry[key]
except KeyError:
return self.registry.setdefault(key, self.createfunc())
下面的三个循环本别将一些属性和操作绑定到了session上。
例如,Session.public_methods里面定义了query、commit、update等操作。当调用session.query时先调用self.registry()去获得session,再从当前session上获取commit对象并调用。