谈谈 happybase 的连接池

我们先来看下 happybase 连接池的使用:

import happybase
hbase_pool = happybase.ConnectionPool(host=conf['hbase']['host'], port=conf['hbase']['port'], size=100)
with hbase_pool.connection() as conn:
    # do sth

可以看到一开始通过指定 host 和 port 初始化了一个大小为 100 的 hbase 连接池。使用 with 关键词从池子里取出了一个连接,通过这个连接我们可以完成对 hbase 的 CRUD 操作。知道怎么使用是远远不够的,遇到一些复杂问题可能会无从下手,所以看了下 happybase 连接池的源码,了解了连接池是怎么对 hbase 的连接进行管理的。

class ConnectionPool(object):
    """
    Thread-safe connection pool.

    .. versionadded:: 0.5

    The `size` argument specifies how many connections this pool
    manages. Additional keyword arguments are passed unmodified to the
    :py:class:`happybase.Connection` constructor, with the exception of
    the `autoconnect` argument, since maintaining connections is the
    task of the pool.

    :param int size: the maximum number of concurrently open connections
    :param kwargs: keyword arguments passed to
                   :py:class:`happybase.Connection`
    """
    def __init__(self, size, **kwargs):
        if not isinstance(size, int):
            raise TypeError("Pool 'size' arg must be an integer")

        if not size > 0:
            raise ValueError("Pool 'size' arg must be greater than zero")

        logger.debug(
            "Initializing connection pool with %d connections", size)

        self._lock = threading.Lock()
        self._queue = queue.LifoQueue(maxsize=size)
        self._thread_connections = threading.local()

        connection_kwargs = kwargs
        connection_kwargs['autoconnect'] = False

        for i in range(size):
            connection = Connection(**connection_kwargs)
            self._queue.put(connection)

        # The first connection is made immediately so that trivial
        # mistakes like unresolvable host names are raised immediately.
        # Subsequent connections are connected lazily.
        with self.connection():
            pass

    def _acquire_connection(self, timeout=None):
        """Acquire a connection from the pool."""
        try:
            return self._queue.get(True, timeout)
        except queue.Empty:
            raise NoConnectionsAvailable(
                "No connection available from pool within specified "
                "timeout")

    def _return_connection(self, connection):
        """Return a connection to the pool."""
        self._queue.put(connection)

    @contextlib.contextmanager
    def connection(self, timeout=None):
        """
        Obtain a connection from the pool.

        This method *must* be used as a context manager, i.e. with
        Python's ``with`` block. Example::

            with pool.connection() as connection:
                pass  # do something with the connection

        If `timeout` is specified, this is the number of seconds to wait
        for a connection to become available before
        :py:exc:`NoConnectionsAvailable` is raised. If omitted, this
        method waits forever for a connection to become available.

        :param int timeout: number of seconds to wait (optional)
        :return: active connection from the pool
        :rtype: :py:class:`happybase.Connection`
        """

        connection = getattr(self._thread_connections, 'current', None)

        return_after_use = False
        if connection is None:
            # This is the outermost connection requests for this thread.
            # Obtain a new connection from the pool and keep a reference
            # in a thread local so that nested connection requests from
            # the same thread can return the same connection instance.
            #
            # Note: this code acquires a lock before assigning to the
            # thread local; see
            # http://emptysquare.net/blog/another-thing-about-pythons-
            # threadlocals/
            return_after_use = True
            connection = self._acquire_connection(timeout)
            with self._lock:
                self._thread_connections.current = connection

        try:
            # Open connection, because connections are opened lazily.
            # This is a no-op for connections that are already open.
            connection.open()

            # Return value from the context manager's __enter__()
            yield connection

        except (TException, socket.error):
            # Refresh the underlying Thrift client if an exception
            # occurred in the Thrift layer, since we don't know whether
            # the connection is still usable.
            logger.info("Replacing tainted pool connection")
            connection._refresh_thrift_client()
            connection.open()

            # Reraise to caller; see contextlib.contextmanager() docs
            raise

        finally:
            # Remove thread local reference after the outermost 'with'
            # block ends. Afterwards the thread no longer owns the
            # connection.
            if return_after_use:
                del self._thread_connections.current
                self._return_connection(connection)

连接池最核心的代码就在上面,我们下面来分析一下。
init方法内部,

self._lock = threading.Lock()
声明了一个线程锁
self._queue = queue.LifoQueue(maxsize=size)
声明了一个线程安全的先入后出队列,大小就是初始化的池子大小,用来存储 hbase 连接的
self._thread_connections = threading.local()
为不同线程对象保存一个本地变量

for i in range(size):
    connection = Connection(**connection_kwargs)
    self._queue.put(connection)
根据 size 大小,初始化 size 个连接,并放入到 queue 中

那取连接怎么取呢?以及如何保存线程安全?我们看下 connection 方法

@contextlib.contextmanager
    def connection(self, timeout=None):
        """
        Obtain a connection from the pool.

        This method *must* be used as a context manager, i.e. with
        Python's ``with`` block. Example::

            with pool.connection() as connection:
                pass  # do something with the connection

        If `timeout` is specified, this is the number of seconds to wait
        for a connection to become available before
        :py:exc:`NoConnectionsAvailable` is raised. If omitted, this
        method waits forever for a connection to become available.

        :param int timeout: number of seconds to wait (optional)
        :return: active connection from the pool
        :rtype: :py:class:`happybase.Connection`
        """

        connection = getattr(self._thread_connections, 'current', None)

        return_after_use = False
        if connection is None:
            # This is the outermost connection requests for this thread.
            # Obtain a new connection from the pool and keep a reference
            # in a thread local so that nested connection requests from
            # the same thread can return the same connection instance.
            #
            # Note: this code acquires a lock before assigning to the
            # thread local; see
            # http://emptysquare.net/blog/another-thing-about-pythons-
            # threadlocals/
            return_after_use = True
            connection = self._acquire_connection(timeout)
            with self._lock:
                self._thread_connections.current = connection

        try:
            # Open connection, because connections are opened lazily.
            # This is a no-op for connections that are already open.
            connection.open()

            # Return value from the context manager's __enter__()
            yield connection

        except (TException, socket.error):
            # Refresh the underlying Thrift client if an exception
            # occurred in the Thrift layer, since we don't know whether
            # the connection is still usable.
            logger.info("Replacing tainted pool connection")
            connection._refresh_thrift_client()
            connection.open()

            # Reraise to caller; see contextlib.contextmanager() docs
            raise

        finally:
            # Remove thread local reference after the outermost 'with'
            # block ends. Afterwards the thread no longer owns the
            # connection.
            if return_after_use:
                del self._thread_connections.current
                self._return_connection(connection)

可以看到 connection 方法用@contextlib.contextmanager 装饰器修饰了,保证了在使用连接池的时候必须使用 with 关键词,在看连接池如何拿到一个连接之前,我们先看下连接的 yield 和释放相关代码:

try:
    # Open connection, because connections are opened lazily.
    # This is a no-op for connections that are already open.
    connection.open()

    # Return value from the context manager's __enter__()
    yield connection

except (TException, socket.error):
    # Refresh the underlying Thrift client if an exception
    # occurred in the Thrift layer, since we don't know whether
    # the connection is still usable.
    logger.info("Replacing tainted pool connection")
    connection._refresh_thrift_client()
    connection.open()

    # Reraise to caller; see contextlib.contextmanager() docs
    raise

finally:
    # Remove thread local reference after the outermost 'with'
    # block ends. Afterwards the thread no longer owns the
    # connection.
    if return_after_use:
        del self._thread_connections.current
        self._return_connection(connection)

可以看到拿到一个连接后,会 yield 出去,finally 里会把连接归还连接池,中间的 except 异常需要注意下,当某个连接在执行的时候出现问题时,会捕获异常,并 refresh 一个新的连接,保证最后 finally 归还给连接池的连接是可用的连接。except 捕获的异常必然是 with 代码内的,代码外的异常是无法捕获的,所以需要保证 with 代码块结束了,对连接的使用就结束了,不然就会出现多个线程占用同一个连接这种情况。类似 scan 操作,返回结果是生成器,最好转成 list 在 with 内部返回,不然直接返回生成器的话,with 代码外部遍历时候,其实还是在用这个连接,而其实 with 已结束,连接池就会认为连接已经用完了,会回收掉分配给其他的线程。
下面看下连接的获得:

connection = getattr(self._thread_connections, 'current', None)

return_after_use = False
if connection is None:
    # This is the outermost connection requests for this thread.
    # Obtain a new connection from the pool and keep a reference
    # in a thread local so that nested connection requests from
    # the same thread can return the same connection instance.
    #
    # Note: this code acquires a lock before assigning to the
    # thread local; see
    # http://emptysquare.net/blog/another-thing-about-pythons-
    # threadlocals/
    return_after_use = True
    connection = self._acquire_connection(timeout)
    with self._lock:
        self._thread_connections.current = connection

会首先获取 _thread_connections 线程本地变量的 current 属性,每个线程的 current 属性都是独立的。注意不同线程的 _thread_connections 都会指向同一个对象,因为这个变量在连接池初始化的时候就确定了。但是 python 的 thread_local 重写了 getattr 方法,里面会调用一个 patch 方法,保证每个线程 local 变量的设置和读取都是独立的。
下面就好理解了,如果连接为空,就去队列取一下,然后 set 到本地变量中。

connection = self._acquire_connection(timeout)
with self._lock:
    self._thread_connections.current = connection

考虑一个问题,如果是协程模型,这个连接池模型还能 work 吗?
如果是 gevent patch 的,是可以的,因为 gevet 会把 threading.local 这一套重写掉,每个协程拿到的对象都是不一样的。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,183评论 6 516
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,850评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,766评论 0 361
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,854评论 1 299
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,871评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,457评论 1 311
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,999评论 3 422
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,914评论 0 277
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,465评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,543评论 3 342
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,675评论 1 353
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,354评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,029评论 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,514评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,616评论 1 274
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,091评论 3 378
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,685评论 2 360

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,707评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,304评论 25 707
  • #一、开始 最近不知道怎么了。 突然想起了当兵的那段时间。 那时我刚当兵。 男儿当兵无非是为了渴望在部队做成一番成...
    智慧测试阅读 550评论 0 1
  • 经常会遇到,经常会混淆,所以记录一下 主要有3种情况 1.内容中有return,finally中没有return2...
    hongdada阅读 745评论 0 0
  • 一天,一个博士坐船欣赏风景。 在船上,博士问渔夫:“你会生物吗?”渔夫说不会,博士就说:“那你的生命就要失去4分之...
    万事从容阅读 215评论 0 0