flask_sqlalchemy是如何保证线程安全的

flask_sqlalchemy对sqlalchemy的scoped_session进行了封装,使其session默认为线程安全(scoped_session)方式。通过查询源码我们发现了scoped_session的创建及工作原理。
sqlalchemy的session创建为工厂模式,及通过传递sessionmaker的函数调用,然后通过实现call方法实现其实例调用。这里对实际创建session的工厂Session类不做探究,只探究该session是如何通过工厂创建的。
我们一般使用db.session.query的方法查询,在flask_sqlalchemy的SQLAlchemy类中实现了session的定义

class SQLAlchemy(object):
    def __init__(self, app=None, use_native_unicode=True, session_options=None,
                 metadata=None, query_class=BaseQuery, model_class=Model,
                 engine_options=None):
        
        self.session = self.create_scoped_session(session_options)  # 创建session
        # 省略其他代码···

            
    def create_scoped_session(self, options=None):
        if options is None:
            options = {}
        # _app_ctx_stack.__ident_func__ 是指获取当前上下文的线程id
        scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)
        options.setdefault('query_cls', self.Query)
        return orm.scoped_session(
            self.create_session(options), scopefunc=scopefunc
        )
    
    def create_session(self, options):
        return orm.sessionmaker(class_=SignallingSession, db=self, **options)

这里可以看出create_session通过封装orm.sessionmaker并将其传递给orm.scoped_session进行创建,scoped_session接收了两个参数,分别为sessionmaker和scopefunc,scopefunc是通过flask应用上下文生成线程id的调用。由scopefunc可以看出猜测scoped_session是线程安全的。orm.scoped_session是其核心,再看其实怎么实现的

sqlalchemy/orm/scoping.py

class scoped_session(object):

    def __init__(self, session_factory, scopefunc=None):
        self.session_factory = session_factory

        if scopefunc:
            self.registry = ScopedRegistry(session_factory, scopefunc)
        else:
            self.registry = ThreadLocalRegistry(session_factory)

    def __call__(self, **kw):
        if kw:
            if self.registry.has():
                raise sa_exc.InvalidRequestError(
                    "Scoped session is already present; "
                    "no new arguments may be specified."
                )
            else:
                sess = self.session_factory(**kw)
                self.registry.set(sess)
                return sess
        else:
            return self.registry()

    def remove(self):
        if self.registry.has():
            self.registry().close()
        self.registry.clear()

    def configure(self, **kwargs):
        if self.registry.has():
            warn(
                "At least one scoped session is already present. "
                " configure() can not affect sessions that have "
                "already been created."
            )

        self.session_factory.configure(**kwargs)

    def query_property(self, query_cls=None):
        class query(object):
            def __get__(s, instance, owner):
                try:
                    mapper = class_mapper(owner)
                    if mapper:
                        if query_cls:
                            # custom query class
                            return query_cls(mapper, session=self.registry())
                        else:
                            # session's configured query class
                            return self.registry().query(mapper)
                except orm_exc.UnmappedClassError:
                    return None

        return query()


ScopedSession = scoped_session


def instrument(name):
    def do(self, *args, **kwargs):
        return getattr(self.registry(), name)(*args, **kwargs)

    return do


for meth in Session.public_methods:
    setattr(scoped_session, meth, instrument(meth))


def makeprop(name):
    def set_(self, attr):
        setattr(self.registry(), name, attr)

    def get(self):
        return getattr(self.registry(), name)

    return property(get, set_)


for prop in (
    "bind",
    "dirty",
    "deleted",
    "new",
    "identity_map",
    "is_active",
    "autoflush",
    "no_autoflush",
    "info",
    "autocommit",
):
    setattr(scoped_session, prop, makeprop(prop))


def clslevel(name):
    def do(cls, *args, **kwargs):
        return getattr(Session, name)(*args, **kwargs)

    return classmethod(do)


for prop in ("close_all", "object_session", "identity_key"):
    setattr(scoped_session, prop, clslevel(prop))

这里可以看出整个session的核心就是self.registry,几乎所有的操作都是由self.registry进行的。对于session的创建使用了工厂模式模式。
从ScopedRegistry的代码可看出,当我们调用了实例对象self.registry()的时候,如果当前线程已经绑定了session将其返回,如果没有则调用了工厂去生成session,然后将其绑定到当前线程上。
我觉得这段代码最有意思的地方在于下面那三个对session对象上的方法与属性的绑定。这也是我们为什么可以直接使用db.session.query,而不是db.session().query的原因,这里体现了代理模式的思想。

class ScopedRegistry(object):

    def __init__(self, createfunc, scopefunc):
        self.createfunc = createfunc
        self.scopefunc = scopefunc
        self.registry = {}

    def __call__(self):
        key = self.scopefunc()
        try:
            return self.registry[key]
        except KeyError:
            return self.registry.setdefault(key, self.createfunc())

下面的三个循环本别将一些属性和操作绑定到了session上。
例如,Session.public_methods里面定义了query、commit、update等操作。当调用session.query时先调用self.registry()去获得session,再从当前session上获取commit对象并调用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容