HikariCP源码简洁剖析——ConcurrentBag

基于2.4版本

ConcurrentBag是什么

ConcurrentBag是HikariCP中实现的一个无锁化集合,比JDK中的LinkedBlockingQueueLinkedTransferQueue的性能更好。借鉴了C#中的设计,作者在这篇文章中说提到的几个点是:

  1. A lock-free design
  2. ThreadLocal caching
  3. Queue-stealing
  4. Direct hand-off optimizations

源码剖析

设计目的

ConcurrentBag的类注释如下:

This is a specialized concurrent bag that achieves superior performance to LinkedBlockingQueue and LinkedTransferQueue for the purposes of a connection pool. It uses ThreadLocal storage when possible to avoid locks, but resorts to scanning a common collection if there are no available items in the ThreadLocal list. Not-in-use items in the ThreadLocal lists can be "stolen" when the borrowing thread has none of its own. It is a "lock-less" implementation using a specialized AbstractQueuedLongSynchronizer to manage cross-thread signaling. Note that items that are "borrowed" from the bag are not actually removed from any collection, so garbage collection will not occur even if the reference is abandoned. Thus care must be taken to "requite" borrowed objects otherwise a memory leak will result. Only the "remove" method can completely remove an object from the bag

简单翻译一下:

ConcurrentBag是为追求链接池操作高性能而设计的并发工具。它使用ThreadLocal缓存来避免锁争抢,当ThreadLocal中没有可用的链接时会去公共集合中“借用”链接。ThreadLocal中处于Not-in-use状态的链接也可能会“借走”。

ConcurrentBag使用AbstractQueuedLongSynchronizer来管理跨线程通信(实际新版本已经删掉了AbstractQueuedLongSynchronizer)。

注意被“借走”的链接并没有从任何集合中删除,所以即使链接的引用被弃用也不会进行gc。所以要及时将被“借走”的链接归还回来,否则可能会发生内存泄露。只有remove方法才会真正将链接从ConcurrentBag中删除。

看下HikariCP中是如何实现ConcurrentBag的。

源码实现

类定义

public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable

ConcurrentBag只是实现了AutoCloseable接口,而没有实现ListMap等接口。其中的元素要集成IConcurrentBagEntry。我们看下IConcurrentBagEntry的定义:

public interface IConcurrentBagEntry
   {
      //定义链接的状态
      int STATE_NOT_IN_USE = 0;
      int STATE_IN_USE = 1;
      int STATE_REMOVED = -1;
      int STATE_RESERVED = -2;

      //对链接状态的操作
      boolean compareAndSet(int expectState, int newState);
      void setState(int newState);
      int getState();
   }

再看下类成员变量:


   //存放共享元素,线程安全的List
   private final CopyOnWriteArrayList<T> sharedList;
   //是否使用弱引用
   private final boolean weakThreadLocals;

   //线程本地缓存
   private final ThreadLocal<List<Object>> threadList;
   //添加元素的监听器,在HikariPool中实现
   private final IBagStateListener listener;
   //当前等待获取元素的线程数
   private final AtomicInteger waiters;
   //ConcurrentBag是否处于关于状态
   private volatile boolean closed;

   //接力队列
   private final SynchronousQueue<T> handoffQueue;

链接PoolEntry

在HikariCP中使用PoolEntry对链接实例Connection进行了封装,记录了Connection相关的数据,如Connection实例、链接状态、当前活跃会话、对链接池引用等。

PoolEntry也是ConcurrentBag管理的对象,sharedListthreadList中保存的对象就是PoolEntry的实例。

/**
 * Entry used in the ConcurrentBag to track Connection instances.
 *
 * @author Brett Wooldridge
 */
final class PoolEntry implements IConcurrentBagEntry {
   //用来更新链接的状态state
   private static final AtomicIntegerFieldUpdater<PoolEntry> stateUpdater;
   //链接实例
   Connection connection;
   //链接状态,如STATE_IN_USE、STATE_NOT_IN_USE
   private volatile int state;
   //驱逐状态,删除该链接时标记为true
   private volatile boolean evict;
   //当前打开的会话
   private final FastList<Statement> openStatements;
   //链接池引用
   private final HikariPool hikariPool;

   private final boolean isReadOnly;
   private final boolean isAutoCommit;
}

ConcurrentBag中的方法比较少,我们一个个看一下:

1. 增加链接

add方法很简单,只是将新的链接放入sharedList中,如果有等待链接的线程,则将链接给该线程。

可以发现其实所有的链接都保存在sharedList中,ThreadList只是其中一部分。

/**
 * Add a new object to the bag for others to borrow.
 *
 *@parambagEntryan object to add to the bag
 */
public void add(final T bagEntry) {
   if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
      throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
   }

    //将链接放入共享队列
   sharedList.add(bagEntry);

   // spin until a thread takes it or none are waiting
   // 等待直到没有waiter或有线程拿走它
   while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
       //yield什么都不做,只是为了让渡CPU使用,避免长期占用
       yield();
   }
}

2. 获取链接

链接获取顺序:

  1. 从线程本地缓存ThreadList中获取,这里保持的是该线程之前使用过的链接
  2. 从共享集合sharedList中获取,如果获取不到,会通知listener新建链接(但不一定真的会新建链接出来)
  3. handoffQueue中阻塞获取,新建的链接或一些转为可用的链接会放入该队列中
   /**
    * The method will borrow a BagEntry from the bag, blocking for the
    * specified timeout if none are available.
    *
    * @param timeout how long to wait before giving up, in units of unit
    * @param timeUnit a <code>TimeUnit</code> determining how to interpret the timeout parameter
    * @return a borrowed instance from the bag or null if a timeout occurs
    * @throws InterruptedException if interrupted while waiting
    */
   public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException {
      // 先看是否能从ThreadList中拿到可用链接,这里的List通常为FastList
      List<Object> list = threadList.get();
      if (weakThreadLocals && list == null) {
         list = new ArrayList<>(16);
         threadList.set(list);
      }

      //1. 试从ThreadList中获取链接,倒序获取
      for (int i = list.size() - 1; i >= 0; i--) {
         final Object entry = list.remove(i);
         @SuppressWarnings("unchecked")
         //获取链接,链接可能使用了弱引用
         final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
         //如果能够获取链接且链接可用,则将该链接的状态从STATE_NOT_IN_USE置为STATE_IN_USE
         if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
         }
      }

      //2. 如果ThreadList中没有可用的链接,则尝试从共享集合中获取链接
      final int waiting = waiters.incrementAndGet();
      try {
         for (T bagEntry : sharedList) {
            if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               // If we may have stolen another waiter's connection, request another bag add.
               if (waiting > 1) {
                  //通知监听器添加链接
                  listener.addBagItem(waiting - 1);
               }
               return bagEntry;
            }
         }

         listener.addBagItem(waiting);

         //3. 尝试从handoffQueue队列中获取。在等待时可能链接被新建或改为转为可用状态
         //SynchronousQueue是一种无容量的BlockingQueue,在poll时如果没有元素,则阻塞等待timeout时间
         timeout = timeUnit.toNanos(timeout);
         do {
            final long start = CLOCK.currentTime();
            final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
            if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
               return bagEntry;
            }

            timeout -= CLOCK.elapsedNanos(start);
         } while (timeout > 10_000);

         return null;
      }
      finally {
         waiters.decrementAndGet();
      }
   }

3. 归还链接

归还链接的顺序:

  1. 将链接置为可用状态STATE_NOT_IN_USE

  2. 如果有等待链接的线程,则将该链接通过handoffQueue给出去

    由于该链接可能在当前线程的threadList里,所以可以发现A线程的threadList中的链接可能被B线程使用

  3. 将它放入当前线程的theadList中

    这里可以看出来threadList一开始是空的,当线程从sharedList中借用了链接并使用完后,会放入自己的缓存中

/**
    * This method will return a borrowed object to the bag.  Objects
    * that are borrowed from the bag but never "requited" will result
    * in a memory leak.
    *
    * @param bagEntry the value to return to the bag
    * @throws NullPointerException if value is null
    * @throws IllegalStateException if the bagEntry was not borrowed from the bag
    */
   public void requite(final T bagEntry) {
      //1. 将链接状态改为STATE_NOT_IN_USE
      bagEntry.setState(STATE_NOT_IN_USE);

      //2. 如果有等待链接的线程,将该链接交出去
      for (int i = 0; waiters.get() > 0; i++) {
         if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
            return;
         } else if ((i & 0xff) == 0xff) {
            parkNanos(MICROSECONDS.toNanos(10));
         } else {
            yield();
         }
      }

      //3. 将链接放入线程本地缓存ThreadList中
      final List<Object> threadLocalList = threadList.get();
      if (threadLocalList != null) {
          threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
      }
   }

链接借用流程

我们可以画个图简单看下链接的借用过程

链接借用流程

github项目地址:https://github.com/caychan/CCoding

求star

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容