hbase-BucketCache剖析

    虽说hbase适合写多读少,但是hbase的读性能也是非常强悍的,hbase有如此好的读性能其中少不了BlockCache。BlockCache是regionserver级别的一种缓存,目前有三种实现方式:LruBlockCache、SlabCache和BucketCache,本文只对BucketCache的实现方式进行剖析。本文从BucketCache的内存模型、读写流程以及使用配置三方面进行说明。

    BucketCache可以指定三种不同的存储介质:onHeap(java堆上内存)、offHeap(java堆外内存)和file(文件),不管使用哪种存储介质,内部的内存模型、读写流程都是一致的。

内存模型

    关于内存模型,BucketCache初始化时默认会申请14个不同大小的Bucket,一种Bucket存储一种指定BlockSize的数据块,每个Bucket的大小默认为2M,不同大小的Bucket之间的内存是可以互相使用的,从而保证的内存的使用率。BucketCache的内存模型如下图所示:

读写流程

    BucketCache中一共包括5个模块:ramCache、backingMap、ioEngine、writerThreads和BucketAllocator。

ramCache:block在写入BucketCache中指定的存储介质之前会先存储在ramCache这map中。

backingMap:记录写入BucketCache的BlockKey和对应Block在BucketCache中的offset。

ioEngine:实际写入存储介质的类,将Block数据写入对应地址的空间中。

writerThreads:多个线程,主要负责异步将Block写入存储介质中,每个线程都有一个支持并发的队列,用来存储Block。

BucketAllocator:为Block分配存储介质上的空间,主要就是获取一个存储介质上的offset,不同Bucket大小有对应的BucketAllocator。

    BucketCache中的读写流程如下图所示:

写入流程:

1.进入到BucketCache类中的Block会首先将BlockKey和对应的Block存入到ramCache这个map,之后将该Block存入到对应writerThread线程对应的队列中。

2.writerThread线程持续地从队列中获取所有的Block。

3.调用对应Bucket大小的BucketAllocator为对应大小的Block分配内存,也就是获取一个存储介质上的offset。

4.调用ioEngine模块将Block写入到分配好的空间上。

cacheBlockWithWait:

    cacheBlockWithWait方法是BucketCache写入Block的入口函数。

public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,

    boolean wait) {

  if (!cacheEnabled) {

    return;

  }

  if (backingMap.containsKey(cacheKey)) {

    return;

  }

//(1)将传入的Block构造成RAMQueueEntry对象,并存入ramCache中。

  RAMQueueEntry re =

      new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);

  if (ramCache.putIfAbsent(cacheKey, re) != null) {

    return;

  }

//(2)将Block构造成的RAMQueueEntry对象,存入写线程的并发队列中。

  int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();

  BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);

  boolean successfulAddition = false;

  if (wait) {

    try {

      successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);

    } catch (InterruptedException e) {

      Thread.currentThread().interrupt();

    }

  } else {

    successfulAddition = bq.offer(re);

  }

// (3)如果存入并发队列失败,则将 ramCache中的RAMQueueEntry对象移出,并记录失败次数。

  if (!successfulAddition) {

    ramCache.remove(cacheKey);

    cacheStats.failInsert();

  } else {

    this.blockNumber.incrementAndGet();

    this.heapSize.addAndGet(cachedItem.heapSize());

    blocksByHFile.put(cacheKey.getHfileName(), cacheKey);

  }

}

WriterThread.run

     异步写入存储介质的后台线程。

public void run() {

  List<RAMQueueEntry> entries = new ArrayList<RAMQueueEntry>();

  try {

    while (cacheEnabled && writerEnabled) {

      try {

        try {

          // Blocks

          //(1) 获取并发队列中存放的所有RAMQueueEntry(Block)

          entries = getRAMQueueEntries(inputQueue, entries);

        } catch (InterruptedException ie) {

          if (!cacheEnabled) break;

        }

        //(2)BucketAllocator分配内存,并调用ioEngine模块将Block写入指定的存储介质中,写入成后存入backingMap中。

        doDrain(entries);

      } catch (Exception ioe) {

        LOG.error("WriterThread encountered error", ioe);

      }

    }

  } catch (Throwable t) {

    LOG.warn("Failed doing drain", t);

  }

  LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);

}

doDrain

    将队列中的Block写入ioEngine模块指定的存储介质中,并将对应的entry写入backingMap中。

void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {

  if (entries.isEmpty()) {

    return;

  }

  final int size = entries.size();

  BucketEntry[] bucketEntries = new BucketEntry[size];

//(1)循环遍历所有的RAMQueueEntry,调用对应大小Bucket的bucketAllocator分配空间,并使用ioEngine将block写入指定的介质。

//如果bucketAllocator分配空间时报错则跳过该entry,并在后面会将ramCache中对应的Block移除。

//如果因为存储介质内存满了,则会调用存储介质的释放空间,如果该存储介质正常释放空间则进行休眠,稍后重试。

    int index = 0;

  while (cacheEnabled && index < size) {

    RAMQueueEntry re = null;

    try {

      re = entries.get(index);

      if (re == null) {

        LOG.warn("Couldn't get entry or changed on us; who else is messing with it?");

        index++;

        continue;

      }

      //(2)通过bucketAllocator获取block在ioEngine中的偏移值,之后使用ioEngine将block写入指定的介质。

      BucketEntry bucketEntry =

        re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize);

      bucketEntries[index] = bucketEntry;

      if (ioErrorStartTime > 0) {

        ioErrorStartTime = -1;

      }

      index++;

    }

    //(3)如果bucketAllocator获取到block在ioEngine的offset(可能是由于block的len大于定义的所有Bucket指定的大小),

    //则跳过该block,之后ramCache会将该block移除。

    catch (BucketAllocatorException fle) {

      LOG.warn("Failed allocation for " + (re == null ? "" : re.getKey()) + "; " + fle);

      // Presume can't add. Too big? Move index on. Entry will be cleared from ramCache below.

      bucketEntries[index] = null;

      index++;

    }

//(4)如果ioEngine指定的介质内存满了则调用freeSpace,同LRU算法进行释放内存,如果正在释放内存就调用休眠,之后再重试

catch (CacheFullException cfe) {

      // Cache full when we tried to add. Try freeing space and then retrying (don't up index)

      if (!freeInProgress) {

        freeSpace("Full!");

      } else {

        Thread.sleep(50);

      }

    } catch (IOException ioex) {

      // Hopefully transient. Retry. checkIOErrorIsTolerated disables cache if problem.

      LOG.error("Failed writing to bucket cache", ioex);

      checkIOErrorIsTolerated();

    }

  }

  // Make sure data pages are written are on media before we update maps.

  // (5)如果ioEngine指定的介质是磁盘,则需要先同步到磁盘之后再添加到backingMap

  try {

    ioEngine.sync();

  } catch (IOException ioex) {

    LOG.error("Failed syncing IO engine", ioex);

    checkIOErrorIsTolerated();

    // Since we failed sync, free the blocks in bucket allocator

    for (int i = 0; i < entries.size(); ++i) {

      if (bucketEntries[i] != null) {

        bucketAllocator.freeBlock(bucketEntries[i].offset());

        bucketEntries[i] = null;

      }

    }

  }

  // Now add to backingMap if successfully added to bucket cache.  Remove from ramCache if

  // success or error.

  for (int i = 0; i < size; ++i) {

    BlockCacheKey key = entries.get(i).getKey();

    // Only add if non-null entry.

    if (bucketEntries[i] != null) {

      backingMap.put(key, bucketEntries[i]);

    }

    // Always remove from ramCache even if we failed adding it to the block cache above.

    RAMQueueEntry ramCacheEntry = ramCache.remove(key);

    if (ramCacheEntry != null) {

      heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());

    } else if (bucketEntries[i] != null){

      // Block should have already been evicted. Remove it and free space.

      ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());

      try {

        lock.writeLock().lock();

        if (backingMap.remove(key, bucketEntries[i])) {

          blockEvicted(key, bucketEntries[i], false);

        }

      } finally {

        lock.writeLock().unlock();

      }

    }

  }

  //(6)已使用的内存大于总内存的95%则进行内存释放

  long used = bucketAllocator.getUsedSize();

  if (used > acceptableSize()) {

    freeSpace("Used=" + used + " > acceptable=" + acceptableSize());

  }

  return;

}

读取流程:

1. 首先从RAMCache中查找。对于还没有来得及写入到bucket的缓存block,一定存储在RAMCache中。

2. 如果在RAMCache中没有找到,再在BackingMap中根据blockKey找到对应物理偏移地址offset。

3. 根据物理偏移地址offset可以直接从内存中查找对应的block数据。

getBlock:

    从Bucketcache中获取对应的block的入口方法。

public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,

    boolean updateCacheMetrics) {

  if (!cacheEnabled) {

    return null;

  }

//(1)从ramCache中获取对应的Block,获取到了则将统计缓存击中加1,并返回。

  RAMQueueEntry re = ramCache.get(key);

  if (re != null) {

    if (updateCacheMetrics) {

      cacheStats.hit(caching, key.isPrimary(), key.getBlockType());

    }

    re.access(accessCount.incrementAndGet());

    return re.getData();

  }

//(2)从backingMap中获取对应Block在存储介质上的offset。

  BucketEntry bucketEntry = backingMap.get(key);

  if (bucketEntry != null) {

    long start = System.nanoTime();

    ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());

    try {

      lock.readLock().lock();

//(3)在获取锁期间,有可能backingMap发生了变化,所以这里需要再次校验。

      if (bucketEntry.equals(backingMap.get(key))) {

        int len = bucketEntry.getLength();

        ByteBuffer bb = ByteBuffer.allocate(len);

//(4)根据backingMap中记录的offset以及len,对应的存储介质中读取指定的字节。

        int lenRead = ioEngine.read(bb, bucketEntry.offset());

        if (lenRead != len) {

          throw new RuntimeException("Only " + lenRead + " bytes read, " + len + " expected");

        }

        CacheableDeserializer<Cacheable> deserializer =

          bucketEntry.deserializerReference(this.deserialiserMap);

//(5)将从介质中读取出来的字节序列化为Cacheable 对象。

        Cacheable cachedBlock = deserializer.deserialize(bb, true);

        long timeTaken = System.nanoTime() - start;

//(6)统计缓存命中次数。

        if (updateCacheMetrics) {

          cacheStats.hit(caching, key.isPrimary(), key.getBlockType());

          cacheStats.ioHit(timeTaken);

        }

        bucketEntry.access(accessCount.incrementAndGet());

        if (this.ioErrorStartTime > 0) {

          ioErrorStartTime = -1;

        }

        return cachedBlock;

      }

    } catch (IOException ioex) {

      LOG.error("Failed reading block " + key + " from bucket cache", ioex);

      checkIOErrorIsTolerated();

    } finally {

      lock.readLock().unlock();

    }

  }

//(7)ramCache和backingMap中都没有找到对应的Block,则将未命中统计加1.

  if (!repeat && updateCacheMetrics) {

    cacheStats.miss(caching, key.isPrimary(), key.getBlockType());

  }

  return null;

}

使用配置

    BucketCache分为三种存储介质:onHeap、offHeap、file。下面对这三种配置进行分别说明。

onHeap模式

<hbase.bucketcache.ioengine>heap</hbase.bucketcache.ioengine>

//bucketcache占用整个jvm内存大小的比例

<hbase.bucketcache.size>0.4</hbase.bucketcache.size>

//bucketcache在combinedcache中的占比

<hbase.bucketcache.combinedcache.percentage>0.9</hbase.bucketcache.combinedcache.percentage>

offHeap模式

<hbase.bucketcache.ioengine>offheap</hbase.bucketcache.ioengine>

<hbase.bucketcache.size>0.4</hbase.bucketcache.size>

<hbase.bucketcache.combinedcache.percentage>0.9</hbase.bucketcache.combinedcache.percentage>

file模式

<hbase.bucketcache.ioengine>file:/cache_path</hbase.bucketcache.ioengine>

//bucketcache缓存空间大小,单位为MB

<hbase.bucketcache.size>10 * 1024</hbase.bucketcache.size>

//高速缓存路径

<hbase.bucketcache.persistent.path>file:/cache_path</hbase.bucketcache.persistent.path>

       今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。

    

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349